diff options
author | niksaveliev <nik@saveliev.me> | 2024-02-05 23:32:05 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-05 23:32:05 +0600 |
commit | 33d4debbe0872b7ed5278719fd720e54d37f198d (patch) | |
tree | ac1c30dd64dc52d9da25d7a65492ffe09efb5805 | |
parent | 47d1a0b3c623adfcb0b6c71a1df359b7edac0fc5 (diff) | |
download | ydb-33d4debbe0872b7ed5278719fd720e54d37f198d.tar.gz |
kcat fixes (#1586)
6 files changed, 40 insertions, 21 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp index 96c628477a5..dc74d519630 100644 --- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp @@ -30,21 +30,21 @@ TApiVersionsResponseData::TPtr GetApiVersions() { TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>(); response->ErrorCode = EKafkaErrors::NONE_ERROR; - AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3}); - AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS); - AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA); - AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID); - AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE); - AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE); - AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS); + AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3, .MaxVersion=9}); + AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS, {.MaxVersion=2}); + AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA, {.MaxVersion=9}); + AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID, {.MaxVersion=4}); + AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE, {.MaxVersion=1}); + AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE, {.MaxVersion=2}); + AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS, {.MinVersion=1, .MaxVersion=1}); AddApiKey<TFetchRequestData>(response->ApiKeys, FETCH, {.MaxVersion=3}); - AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP); - AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP); - AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP); - AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT); - AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR); - AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1}); - AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH); + AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP, {.MaxVersion=9}); + AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP, {.MaxVersion=3}); + AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP, {.MaxVersion=5}); + AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT, {.MaxVersion=4}); + AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR, {.MaxVersion=0}); + AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=0}); + AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH, {.MaxVersion=8}); return response; } diff --git a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp index 63709d8385c..c1f473f59f9 100644 --- a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp @@ -49,6 +49,11 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p response->Coordinators.push_back(coordinator); } + response->ErrorCode = NONE_ERROR; + response->Host = host; + response->Port = port; + response->NodeId = nodeId; + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode))); Die(ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp index 01cee28a259..96893aaa59c 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp @@ -204,6 +204,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse() partition.CommittedOffset = sourcePartition.CommittedOffset; partition.PartitionIndex = sourcePartition.PartitionIndex; partition.ErrorCode = sourcePartition.ErrorCode; + topic.Partitions.push_back(partition); } response->Topics.push_back(topic); } diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index e788f7ceb73..e8f0a202450 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -86,8 +86,8 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr switch (ReadStep) { case WAIT_JOIN_GROUP: { // join first time - if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) { - SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType); + if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value()); CloseReadSession(ctx); return; } @@ -156,8 +156,8 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr return; } - if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) { - SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType); + if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) { + SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value()); CloseReadSession(ctx); return; } @@ -361,9 +361,9 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala for (auto part: finalPartitionsToRead) { KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part); topicPartition.Partitions.push_back(part); - assignment.AssignedPartitions.push_back(topicPartition); partitions.ReadingNow.emplace(part); } + assignment.AssignedPartitions.push_back(topicPartition); } return assignment; diff --git a/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp index dd2e52eb7a6..ce54125ffd5 100644 --- a/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp +++ b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp @@ -143,12 +143,20 @@ void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion } void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + auto useVarintSize = _version > 3; _version = ASSIGNMENT_VERSION; + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment"; } - _writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1); + if (useVarintSize) { + _writable.writeUnsignedVarint(Size(_version) + 1); + } else { + TKafkaInt32 size = Size(_version); + _writable << size; + } + _writable << _version; NPrivate::TWriteCollector _collector; NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions); diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index 8b08d98ce47..b68f3cd9ba7 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -4517,7 +4517,12 @@ i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const { if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); } - return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1); + auto useVarintSize = _version > 3; + if (useVarintSize) { + return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1); + } else { + return _collector.Size + sizeof(TKafkaInt32); + } } |