aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorniksaveliev <nik@saveliev.me>2024-02-05 23:32:05 +0600
committerGitHub <noreply@github.com>2024-02-05 23:32:05 +0600
commit33d4debbe0872b7ed5278719fd720e54d37f198d (patch)
treeac1c30dd64dc52d9da25d7a65492ffe09efb5805
parent47d1a0b3c623adfcb0b6c71a1df359b7edac0fc5 (diff)
downloadydb-33d4debbe0872b7ed5278719fd720e54d37f198d.tar.gz
kcat fixes (#1586)
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp28
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp5
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp1
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp10
-rw-r--r--ydb/core/kafka_proxy/kafka_consumer_protocol.cpp10
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.cpp7
6 files changed, 40 insertions, 21 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
index 96c628477a5..dc74d519630 100644
--- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
@@ -30,21 +30,21 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>();
response->ErrorCode = EKafkaErrors::NONE_ERROR;
- AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3});
- AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS);
- AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA);
- AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID);
- AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE);
- AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE);
- AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS);
+ AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3, .MaxVersion=9});
+ AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS, {.MaxVersion=2});
+ AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA, {.MaxVersion=9});
+ AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID, {.MaxVersion=4});
+ AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE, {.MaxVersion=1});
+ AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE, {.MaxVersion=2});
+ AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS, {.MinVersion=1, .MaxVersion=1});
AddApiKey<TFetchRequestData>(response->ApiKeys, FETCH, {.MaxVersion=3});
- AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP);
- AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP);
- AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP);
- AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT);
- AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR);
- AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1});
- AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH);
+ AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP, {.MaxVersion=9});
+ AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP, {.MaxVersion=3});
+ AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP, {.MaxVersion=5});
+ AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT, {.MaxVersion=4});
+ AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR, {.MaxVersion=0});
+ AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=0});
+ AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH, {.MaxVersion=8});
return response;
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
index 63709d8385c..c1f473f59f9 100644
--- a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
@@ -49,6 +49,11 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
response->Coordinators.push_back(coordinator);
}
+ response->ErrorCode = NONE_ERROR;
+ response->Host = host;
+ response->Port = port;
+ response->NodeId = nodeId;
+
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
index 01cee28a259..96893aaa59c 100644
--- a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
@@ -204,6 +204,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
partition.CommittedOffset = sourcePartition.CommittedOffset;
partition.PartitionIndex = sourcePartition.PartitionIndex;
partition.ErrorCode = sourcePartition.ErrorCode;
+ topic.Partitions.push_back(partition);
}
response->Topics.push_back(topic);
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
index e788f7ceb73..e8f0a202450 100644
--- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
@@ -86,8 +86,8 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr
switch (ReadStep) {
case WAIT_JOIN_GROUP: { // join first time
- if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
- SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType);
+ if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
@@ -156,8 +156,8 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
return;
}
- if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
- SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType);
+ if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
+ SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
@@ -361,9 +361,9 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
for (auto part: finalPartitionsToRead) {
KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part);
topicPartition.Partitions.push_back(part);
- assignment.AssignedPartitions.push_back(topicPartition);
partitions.ReadingNow.emplace(part);
}
+ assignment.AssignedPartitions.push_back(topicPartition);
}
return assignment;
diff --git a/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
index dd2e52eb7a6..ce54125ffd5 100644
--- a/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
+++ b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
@@ -143,12 +143,20 @@ void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion
}
void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ auto useVarintSize = _version > 3;
_version = ASSIGNMENT_VERSION;
+
if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment";
}
- _writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1);
+ if (useVarintSize) {
+ _writable.writeUnsignedVarint(Size(_version) + 1);
+ } else {
+ TKafkaInt32 size = Size(_version);
+ _writable << size;
+ }
+
_writable << _version;
NPrivate::TWriteCollector _collector;
NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions);
diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp
index 8b08d98ce47..b68f3cd9ba7 100644
--- a/ydb/core/kafka_proxy/kafka_messages.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages.cpp
@@ -4517,7 +4517,12 @@ i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const {
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
_collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
}
- return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
+ auto useVarintSize = _version > 3;
+ if (useVarintSize) {
+ return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
+ } else {
+ return _collector.Size + sizeof(TKafkaInt32);
+ }
}