diff options
author | niksaveliev <nik@saveliev.me> | 2024-01-29 19:29:23 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-29 19:29:23 +0600 |
commit | 156cd635a6cb33112115f06b02edc62fc9847e32 (patch) | |
tree | 877b6d432b27e8dafd9afb1360d72fadd52f286c | |
parent | 2dc957da7e73048f7fe249115fc64235ec46076c (diff) | |
download | ydb-156cd635a6cb33112115f06b02edc62fc9847e32.tar.gz |
Fix kafka metadata actor for Topics.size() == 0 and fix read by short topic name (#1287)
* Fix kafka metadata actor for Topics.size() == 0 and fix read by short topic name
* Fixes
5 files changed, 71 insertions, 24 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index 5117f70993..9a9029a9f2 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -19,14 +19,45 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { Response->Topics.resize(Message->Topics.size()); + Response->ClusterId = "ydb-cluster"; + Response->ControllerId = 1; + + if (WithProxy) { + AddProxyNodeToBrokers(); + } + + if (Message->Topics.size() == 0 && !WithProxy) { + AddCurrentNodeToBrokers(); + } + + if (Message->Topics.size() != 0) { + ProcessTopics(); + } + + Become(&TKafkaMetadataActor::StateWork); + RespondIfRequired(ctx); +} + +void TKafkaMetadataActor::AddCurrentNodeToBrokers() { + PendingResponses++; + Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest()); +} + +void TKafkaMetadataActor::AddProxyNodeToBrokers() { + auto broker = TMetadataResponseData::TMetadataResponseBroker{}; + broker.NodeId = ProxyNodeId; + broker.Host = Context->Config.GetProxy().GetHostname(); + broker.Port = Context->Config.GetProxy().GetPort(); + Response->Brokers.emplace_back(std::move(broker)); +} + +void TKafkaMetadataActor::ProcessTopics() { THashMap<TString, TActorId> partitionActors; for (size_t i = 0; i < Message->Topics.size(); ++i) { Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{}; auto& reqTopic = Message->Topics[i]; Response->Topics[i].Name = reqTopic.Name.value_or(""); - Response->ClusterId = "ydb-cluster"; - Response->ControllerId = 1; if (!reqTopic.Name.value_or("")) { AddTopicError(Response->Topics[i], EKafkaErrors::INVALID_TOPIC_EXCEPTION); @@ -43,8 +74,21 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { } TopicIndexes[child].push_back(i); } - Become(&TKafkaMetadataActor::StateWork); +} +void TKafkaMetadataActor::HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) { + auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId()); + Y_ABORT_UNLESS(!iter.IsEnd()); + auto host = (*ev->Get()->Nodes)[iter->second].Host; + KAFKA_LOG_D("Incoming TEvGetAllNodesInfoResponse. Host#: " << host); + + auto broker = TMetadataResponseData::TMetadataResponseBroker{}; + broker.NodeId = ctx.SelfID.NodeId(); + broker.Host = host; + broker.Port = Context->Config.GetListeningPort(); + Response->Brokers.emplace_back(std::move(broker)); + + --PendingResponses; RespondIfRequired(ctx); } @@ -69,21 +113,11 @@ void TKafkaMetadataActor::AddTopicError( } void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) { - bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty(); - topic.ErrorCode = NONE_ERROR; - //topic.TopicId = TKafkaUuid(response->SchemeShardId, response->PathId); - if (withProxy) { - auto broker = TMetadataResponseData::TMetadataResponseBroker{}; - broker.NodeId = ProxyNodeId; - broker.Host = Context->Config.GetProxy().GetHostname(); - broker.Port = Context->Config.GetProxy().GetPort(); - Response->Brokers.emplace_back(std::move(broker)); - } topic.Partitions.reserve(response->Partitions.size()); for (const auto& part : response->Partitions) { - auto nodeId = withProxy ? ProxyNodeId : part.NodeId; + auto nodeId = WithProxy ? ProxyNodeId : part.NodeId; TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition; responsePartition.PartitionIndex = part.PartitionId; @@ -95,7 +129,7 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo topic.Partitions.emplace_back(std::move(responsePartition)); - if (!withProxy) { + if (!WithProxy) { auto ins = AllClusterNodes.insert(part.NodeId); if (ins.second) { auto hostname = part.Hostname; @@ -123,12 +157,12 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc Y_DEBUG_ABORT_UNLESS(!actorIter->second.empty()); if (actorIter.IsEnd()) { - KAFKA_LOG_CRIT("Metadata actor: got unexpected location response, ignoring. Expect malformed/incompled reply"); + KAFKA_LOG_CRIT("Got unexpected location response, ignoring. Expect malformed/incompled reply"); return RespondIfRequired(ctx); } if (actorIter->second.empty()) { - KAFKA_LOG_CRIT("Metadata actor: corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply"); + KAFKA_LOG_CRIT("Corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply"); return RespondIfRequired(ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h index ad00d1071c..38d3058996 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h @@ -1,8 +1,10 @@ #include "actors.h" +#include <ydb/core/kafka_proxy/kafka_events.h> #include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/aclib/aclib.h> #include <ydb/services/persqueue_v1/actors/events.h> +#include <ydb/services/persqueue_v1/actors/schema_actors.h> namespace NKafka { @@ -12,6 +14,7 @@ public: : Context(context) , CorrelationId(correlationId) , Message(message) + , WithProxy(context->Config.HasProxy() && !context->Config.GetProxy().GetHostname().Empty()) , Response(new TMetadataResponseData()) {} @@ -22,14 +25,19 @@ private: TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest); void HandleResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx); + void HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx); void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response); void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode); void RespondIfRequired(const NActors::TActorContext& ctx); + void AddProxyNodeToBrokers(); + void AddCurrentNodeToBrokers(); + void ProcessTopics(); STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { HFunc(TEvLocationResponse, HandleResponse); + HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, HandleNodesResponse); } } @@ -39,6 +47,7 @@ private: const TContext::TPtr Context; const ui64 CorrelationId; const TMessagePtr<TMetadataRequestData> Message; + const bool WithProxy; ui64 PendingResponses = 0; diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp index 4a4babf95a..1c31a21c26 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp @@ -130,9 +130,9 @@ void TKafkaOffsetCommitActor::Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk void TKafkaOffsetCommitActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) { const auto& partitionResult = ev->Get()->Record.GetPartitionResponse(); auto requestInfo = CookieToRequestInfo.find(partitionResult.GetCookie()); - requestInfo->second.Done = true; - Y_ABORT_UNLESS(requestInfo != CookieToRequestInfo.end()); + + requestInfo->second.Done = true; if (ev->Get()->Record.GetErrorCode() != NPersQueue::NErrorCode::OK) { KAFKA_LOG_CRIT("Commit offset error. status# " << EErrorCode_Name(ev->Get()->Record.GetErrorCode()) << ", reason# " << ev->Get()->Record.GetErrorReason()); } diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index c30cab1582..e788f7ceb7 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -319,6 +319,7 @@ bool TKafkaReadSessionActor::CheckHeartbeatIsExpired() { bool TKafkaReadSessionActor::TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics) { auto supportedProtocolFound = false; for (auto protocol: joinGroupRequestData->Protocols) { + KAFKA_LOG_D("JOIN_GROUP assign protocol supported by client: " << protocol.Name); if (protocol.Name == SUPPORTED_ASSIGN_STRATEGY) { FillTopicsFromJoinGroupMetadata(protocol.Metadata, topics); supportedProtocolFound = true; @@ -339,7 +340,7 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala THashSet<ui64> finalPartitionsToRead; TConsumerProtocolAssignment::TopicPartition topicPartition; - topicPartition.Topic = topicName; + topicPartition.Topic = OriginalTopicNames[topicName]; for (auto part: partitions.ToLock) { finalPartitionsToRead.emplace(part); } @@ -379,7 +380,9 @@ void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metada for (auto topic: result.Topics) { if (topic.has_value()) { - topics.emplace(NormalizePath(Context->DatabasePath, topic.value())); + auto normalizedTopicName = NormalizePath(Context->DatabasePath, topic.value()); + OriginalTopicNames[normalizedTopicName] = topic.value(); + topics.emplace(normalizedTopicName); KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic); } } @@ -536,9 +539,9 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition: return; } - const auto name = converterIter->second->GetInternalName(); + const auto topicName = converterIter->second->GetInternalName(); - auto topicInfoIt = TopicsInfo.find(name); + auto topicInfoIt = TopicsInfo.find(topicName); if (topicInfoIt == TopicsInfo.end() || (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient()))) { KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic() << ", partition# " << record.GetPartition() @@ -549,7 +552,7 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition: TNewPartitionToLockInfo partitionToLock; partitionToLock.LockOn = ctx.Now() + LOCK_PARTITION_DELAY; partitionToLock.PartitionId = record.GetPartition(); - NewPartitionsToLockOnTime[name].push_back(partitionToLock); + NewPartitionsToLockOnTime[topicName].push_back(partitionToLock); } void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h index 45f704a43c..e8f29318e7 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h @@ -174,6 +174,7 @@ private: THashMap<TString, NGRpcProxy::TTopicHolder> TopicsInfo; // topic -> info NPersQueue::TTopicsToConverter TopicsToConverter; THashSet<TString> TopicsToReadNames; + THashMap<TString, TString> OriginalTopicNames; THashMap<TString, TPartitionsInfo> TopicPartitions; THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching THashMap<TString, TVector<TNewPartitionToLockInfo>> NewPartitionsToLockOnTime; // Topic -> PartitionsToLock |