aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorniksaveliev <nik@saveliev.me>2024-01-29 19:29:23 +0600
committerGitHub <noreply@github.com>2024-01-29 19:29:23 +0600
commit156cd635a6cb33112115f06b02edc62fc9847e32 (patch)
tree877b6d432b27e8dafd9afb1360d72fadd52f286c
parent2dc957da7e73048f7fe249115fc64235ec46076c (diff)
downloadydb-156cd635a6cb33112115f06b02edc62fc9847e32.tar.gz
Fix kafka metadata actor for Topics.size() == 0 and fix read by short topic name (#1287)
* Fix kafka metadata actor for Topics.size() == 0 and fix read by short topic name * Fixes
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp68
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.h9
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp4
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp13
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_read_session_actor.h1
5 files changed, 71 insertions, 24 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
index 5117f70993..9a9029a9f2 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
@@ -19,14 +19,45 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
Response->Topics.resize(Message->Topics.size());
+ Response->ClusterId = "ydb-cluster";
+ Response->ControllerId = 1;
+
+ if (WithProxy) {
+ AddProxyNodeToBrokers();
+ }
+
+ if (Message->Topics.size() == 0 && !WithProxy) {
+ AddCurrentNodeToBrokers();
+ }
+
+ if (Message->Topics.size() != 0) {
+ ProcessTopics();
+ }
+
+ Become(&TKafkaMetadataActor::StateWork);
+ RespondIfRequired(ctx);
+}
+
+void TKafkaMetadataActor::AddCurrentNodeToBrokers() {
+ PendingResponses++;
+ Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
+}
+
+void TKafkaMetadataActor::AddProxyNodeToBrokers() {
+ auto broker = TMetadataResponseData::TMetadataResponseBroker{};
+ broker.NodeId = ProxyNodeId;
+ broker.Host = Context->Config.GetProxy().GetHostname();
+ broker.Port = Context->Config.GetProxy().GetPort();
+ Response->Brokers.emplace_back(std::move(broker));
+}
+
+void TKafkaMetadataActor::ProcessTopics() {
THashMap<TString, TActorId> partitionActors;
for (size_t i = 0; i < Message->Topics.size(); ++i) {
Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{};
auto& reqTopic = Message->Topics[i];
Response->Topics[i].Name = reqTopic.Name.value_or("");
- Response->ClusterId = "ydb-cluster";
- Response->ControllerId = 1;
if (!reqTopic.Name.value_or("")) {
AddTopicError(Response->Topics[i], EKafkaErrors::INVALID_TOPIC_EXCEPTION);
@@ -43,8 +74,21 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
}
TopicIndexes[child].push_back(i);
}
- Become(&TKafkaMetadataActor::StateWork);
+}
+void TKafkaMetadataActor::HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
+ auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
+ Y_ABORT_UNLESS(!iter.IsEnd());
+ auto host = (*ev->Get()->Nodes)[iter->second].Host;
+ KAFKA_LOG_D("Incoming TEvGetAllNodesInfoResponse. Host#: " << host);
+
+ auto broker = TMetadataResponseData::TMetadataResponseBroker{};
+ broker.NodeId = ctx.SelfID.NodeId();
+ broker.Host = host;
+ broker.Port = Context->Config.GetListeningPort();
+ Response->Brokers.emplace_back(std::move(broker));
+
+ --PendingResponses;
RespondIfRequired(ctx);
}
@@ -69,21 +113,11 @@ void TKafkaMetadataActor::AddTopicError(
}
void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) {
- bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();
-
topic.ErrorCode = NONE_ERROR;
- //topic.TopicId = TKafkaUuid(response->SchemeShardId, response->PathId);
- if (withProxy) {
- auto broker = TMetadataResponseData::TMetadataResponseBroker{};
- broker.NodeId = ProxyNodeId;
- broker.Host = Context->Config.GetProxy().GetHostname();
- broker.Port = Context->Config.GetProxy().GetPort();
- Response->Brokers.emplace_back(std::move(broker));
- }
topic.Partitions.reserve(response->Partitions.size());
for (const auto& part : response->Partitions) {
- auto nodeId = withProxy ? ProxyNodeId : part.NodeId;
+ auto nodeId = WithProxy ? ProxyNodeId : part.NodeId;
TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition;
responsePartition.PartitionIndex = part.PartitionId;
@@ -95,7 +129,7 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo
topic.Partitions.emplace_back(std::move(responsePartition));
- if (!withProxy) {
+ if (!WithProxy) {
auto ins = AllClusterNodes.insert(part.NodeId);
if (ins.second) {
auto hostname = part.Hostname;
@@ -123,12 +157,12 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc
Y_DEBUG_ABORT_UNLESS(!actorIter->second.empty());
if (actorIter.IsEnd()) {
- KAFKA_LOG_CRIT("Metadata actor: got unexpected location response, ignoring. Expect malformed/incompled reply");
+ KAFKA_LOG_CRIT("Got unexpected location response, ignoring. Expect malformed/incompled reply");
return RespondIfRequired(ctx);
}
if (actorIter->second.empty()) {
- KAFKA_LOG_CRIT("Metadata actor: corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply");
+ KAFKA_LOG_CRIT("Corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply");
return RespondIfRequired(ctx);
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
index ad00d1071c..38d3058996 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
@@ -1,8 +1,10 @@
#include "actors.h"
+#include <ydb/core/kafka_proxy/kafka_events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/aclib/aclib.h>
#include <ydb/services/persqueue_v1/actors/events.h>
+#include <ydb/services/persqueue_v1/actors/schema_actors.h>
namespace NKafka {
@@ -12,6 +14,7 @@ public:
: Context(context)
, CorrelationId(correlationId)
, Message(message)
+ , WithProxy(context->Config.HasProxy() && !context->Config.GetProxy().GetHostname().Empty())
, Response(new TMetadataResponseData())
{}
@@ -22,14 +25,19 @@ private:
TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest);
void HandleResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx);
+ void HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx);
void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response);
void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode);
void RespondIfRequired(const NActors::TActorContext& ctx);
+ void AddProxyNodeToBrokers();
+ void AddCurrentNodeToBrokers();
+ void ProcessTopics();
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvLocationResponse, HandleResponse);
+ HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, HandleNodesResponse);
}
}
@@ -39,6 +47,7 @@ private:
const TContext::TPtr Context;
const ui64 CorrelationId;
const TMessagePtr<TMetadataRequestData> Message;
+ const bool WithProxy;
ui64 PendingResponses = 0;
diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
index 4a4babf95a..1c31a21c26 100644
--- a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
@@ -130,9 +130,9 @@ void TKafkaOffsetCommitActor::Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk
void TKafkaOffsetCommitActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
const auto& partitionResult = ev->Get()->Record.GetPartitionResponse();
auto requestInfo = CookieToRequestInfo.find(partitionResult.GetCookie());
- requestInfo->second.Done = true;
-
Y_ABORT_UNLESS(requestInfo != CookieToRequestInfo.end());
+
+ requestInfo->second.Done = true;
if (ev->Get()->Record.GetErrorCode() != NPersQueue::NErrorCode::OK) {
KAFKA_LOG_CRIT("Commit offset error. status# " << EErrorCode_Name(ev->Get()->Record.GetErrorCode()) << ", reason# " << ev->Get()->Record.GetErrorReason());
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
index c30cab1582..e788f7ceb7 100644
--- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
@@ -319,6 +319,7 @@ bool TKafkaReadSessionActor::CheckHeartbeatIsExpired() {
bool TKafkaReadSessionActor::TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics) {
auto supportedProtocolFound = false;
for (auto protocol: joinGroupRequestData->Protocols) {
+ KAFKA_LOG_D("JOIN_GROUP assign protocol supported by client: " << protocol.Name);
if (protocol.Name == SUPPORTED_ASSIGN_STRATEGY) {
FillTopicsFromJoinGroupMetadata(protocol.Metadata, topics);
supportedProtocolFound = true;
@@ -339,7 +340,7 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
THashSet<ui64> finalPartitionsToRead;
TConsumerProtocolAssignment::TopicPartition topicPartition;
- topicPartition.Topic = topicName;
+ topicPartition.Topic = OriginalTopicNames[topicName];
for (auto part: partitions.ToLock) {
finalPartitionsToRead.emplace(part);
}
@@ -379,7 +380,9 @@ void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metada
for (auto topic: result.Topics) {
if (topic.has_value()) {
- topics.emplace(NormalizePath(Context->DatabasePath, topic.value()));
+ auto normalizedTopicName = NormalizePath(Context->DatabasePath, topic.value());
+ OriginalTopicNames[normalizedTopicName] = topic.value();
+ topics.emplace(normalizedTopicName);
KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic);
}
}
@@ -536,9 +539,9 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:
return;
}
- const auto name = converterIter->second->GetInternalName();
+ const auto topicName = converterIter->second->GetInternalName();
- auto topicInfoIt = TopicsInfo.find(name);
+ auto topicInfoIt = TopicsInfo.find(topicName);
if (topicInfoIt == TopicsInfo.end() || (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient()))) {
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
@@ -549,7 +552,7 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:
TNewPartitionToLockInfo partitionToLock;
partitionToLock.LockOn = ctx.Now() + LOCK_PARTITION_DELAY;
partitionToLock.PartitionId = record.GetPartition();
- NewPartitionsToLockOnTime[name].push_back(partitionToLock);
+ NewPartitionsToLockOnTime[topicName].push_back(partitionToLock);
}
void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx) {
diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h
index 45f704a43c..e8f29318e7 100644
--- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h
@@ -174,6 +174,7 @@ private:
THashMap<TString, NGRpcProxy::TTopicHolder> TopicsInfo; // topic -> info
NPersQueue::TTopicsToConverter TopicsToConverter;
THashSet<TString> TopicsToReadNames;
+ THashMap<TString, TString> OriginalTopicNames;
THashMap<TString, TPartitionsInfo> TopicPartitions;
THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching
THashMap<TString, TVector<TNewPartitionToLockInfo>> NewPartitionsToLockOnTime; // Topic -> PartitionsToLock