diff options
| author | sergeyveselov <[email protected]> | 2023-12-11 23:13:46 +0300 |
|---|---|---|
| committer | sergeyveselov <[email protected]> | 2023-12-11 23:44:41 +0300 |
| commit | b8c6f928ac8754689d03f515c41d55073326a5fd (patch) | |
| tree | 0636f7ff2bc22807fd0f5457deb76ed965b15f57 | |
| parent | 37e0c276d8f499c053b0071970902cee318a1700 (diff) | |
Handle RaiseError calls in OffsetFetchActor
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp | 92 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h | 17 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/kafka_events.h | 7 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 90 |
4 files changed, 170 insertions, 36 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp index 77cb1c151f4..b6d537aed64 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp @@ -41,8 +41,8 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< std::shared_ptr<TSet<ui32>> partitions) : TBase(request, requester) , TDescribeTopicActorImpl(ConsumerOffsetSettings(consumers, partitions)) - , Requester_(requester) - , TopicName_(request.Topic) + , Requester(requester) + , TopicName(request.Topic) { Y_UNUSED(requester); }; @@ -63,7 +63,6 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< } } - // Noop void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, @@ -72,6 +71,12 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< Y_UNUSED(errorCode); Y_UNUSED(status); Y_UNUSED(ctx); + + THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse()); + response->TopicName = TopicName; + response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::ERROR; + Send(Requester, response.Release()); + Die(ctx); } void ApplyResponse(TTabletInfo& tabletInfo, @@ -86,7 +91,7 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< consumerToOffset[consumerResult.GetConsumer()] = consumerResult.GetCommitedOffset(); } } - (*PartitionIdToOffsets_)[partResult.GetPartition()] = consumerToOffset; + (*PartitionIdToOffsets)[partResult.GetPartition()] = consumerToOffset; } }; @@ -99,17 +104,24 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< Y_UNUSED(ev); }; - // Noop + // Should never be called bool ApplyResponse(NKikimr::TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override { Y_UNUSED(ctx); Y_UNUSED(ev); - return true; + Y_ABORT(); }; void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override { const auto& response = ev->Get()->Request.Get()->ResultSet.front(); - Y_ABORT_UNLESS(response.PQGroupInfo); + if (!response.PQGroupInfo) { + THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse()); + response->TopicName = TopicName; + response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::UNKNOWN_TOPIC; + Send(Requester, response.Release()); + TActorBootstrapped::PassAway(); + return; + } const auto& pqDescr = response.PQGroupInfo->Description; ProcessTablets(pqDescr, ActorContext()); @@ -117,17 +129,18 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< void Reply(const TActorContext& ctx) override { THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse()); - response->TopicName = TopicName_; - response->PartitionIdToOffsets = PartitionIdToOffsets_; - Send(Requester_, response.Release()); + response->TopicName = TopicName; + response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::OK; + response->PartitionIdToOffsets = PartitionIdToOffsets; + Send(Requester, response.Release()); Die(ctx); }; private: - TActorId Requester_; - TString TopicName_; - std::unordered_map<ui32, ui32> PartitionIdToOffset_ {}; - std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets_ = std::make_shared<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>(); + TActorId Requester; + TString TopicName; + std::unordered_map<ui32, ui32> PartitionIdToOffset {}; + std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets = std::make_shared<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>(); }; NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetFetchRequestData>& message) { @@ -143,7 +156,27 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse() TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics topic; TString topicName = requestTopic.Name.value(); topic.Name = topicName; - auto partitionsToOffsets = TopicToOffsets_[topicName]; + if (UnknownTopics.contains(topicName)) { + for (auto requestPartition: requestTopic.PartitionIndexes) { + TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition; + partition.PartitionIndex = requestPartition; + partition.ErrorCode = UNKNOWN_TOPIC_OR_PARTITION; + topic.Partitions.push_back(partition); + } + group.Topics.push_back(topic); + continue; + } + if (ErroredTopics.contains(topicName)) { + for (auto requestPartition: requestTopic.PartitionIndexes) { + TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition; + partition.PartitionIndex = requestPartition; + partition.ErrorCode = UNKNOWN_SERVER_ERROR; + topic.Partitions.push_back(partition); + } + group.Topics.push_back(topic); + continue; + } + auto partitionsToOffsets = TopicToOffsets[topicName]; for (auto requestPartition: requestTopic.PartitionIndexes) { TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition; partition.PartitionIndex = requestPartition; @@ -169,12 +202,12 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse() partition.PartitionIndex = sourcePartition.PartitionIndex; partition.ErrorCode = sourcePartition.ErrorCode; } + response->Topics.push_back(topic); } } return response; } - void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { // If API level <= 7, Groups would be empty. In this case we convert message to level 8 and process it uniformely later if (Message->Groups.empty()) { @@ -196,7 +229,7 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { } } - for (const auto& topicToEntities : TopicToEntities_) { + for (const auto& topicToEntities : TopicToEntities) { NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest locationRequest{}; locationRequest.Topic = topicToEntities.first; locationRequest.Token = Context->UserToken->GetSerializedToken(); @@ -207,7 +240,7 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { SelfId(), topicToEntities.second.Partitions )); - InflyTopics_++; + InflyTopics++; } Become(&TKafkaOffsetFetchActor::StateWork); } @@ -215,27 +248,26 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { void TKafkaOffsetFetchActor::ExtractPartitions(const TString& group, const NKafka::TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics& topic) { TString topicName = topic.Name.value(); - if (!TopicToEntities_.contains(topicName)) { + if (!TopicToEntities.contains(topicName)) { TopicEntities newEntities; - TopicToEntities_[topicName] = newEntities; + TopicToEntities[topicName] = newEntities; } - TopicEntities& entities = TopicToEntities_[topicName]; + TopicEntities& entities = TopicToEntities[topicName]; entities.Consumers->insert(group); for (auto partition: topic.PartitionIndexes) { entities.Partitions->insert(partition); } }; -void TKafkaOffsetFetchActor::StateWork(TAutoPtr<IEventHandle>& ev) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKafka::TEvCommitedOffsetsResponse, Handle); - } -} - void TKafkaOffsetFetchActor::Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx) { - InflyTopics_--; - TopicToOffsets_[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets; - if (InflyTopics_ == 0) { + InflyTopics--; + TopicToOffsets[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets; + if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::ERROR) { + ErroredTopics.insert(ev->Get()->TopicName); + } else if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::UNKNOWN_TOPIC) { + UnknownTopics.insert(ev->Get()->TopicName); + } + if (InflyTopics == 0) { auto response = GetOffsetFetchResponse(); Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode))); Die(ctx); diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h index 09d01cc0669..ec7f1075df7 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h @@ -19,18 +19,27 @@ public: } void Bootstrap(const NActors::TActorContext& ctx); - void StateWork(TAutoPtr<IEventHandle>& ev); + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKafka::TEvCommitedOffsetsResponse, Handle); + } + } + void Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx); void ExtractPartitions(const TString& group, const NKafka::TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics& topic); TOffsetFetchResponseData::TPtr GetOffsetFetchResponse(); + void ReplyError(const TActorContext& ctx); private: const TContext::TPtr Context; const ui64 CorrelationId; const TMessagePtr<TOffsetFetchRequestData> Message; - std::unordered_map<TString, TopicEntities> TopicToEntities_; - std::unordered_map<TString, std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>> TopicToOffsets_; - ui32 InflyTopics_ = 0; + std::unordered_map<TString, TopicEntities> TopicToEntities; + std::unordered_map<TString, std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>> TopicToOffsets; + std::set<TString> UnknownTopics; + std::set<TString> ErroredTopics; + ui32 InflyTopics = 0; }; diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index dc09b269a4e..e0ddb88a2bc 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -210,10 +210,17 @@ struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResp struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse> , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase { + enum EStatus { + OK, + ERROR, + UNKNOWN_TOPIC, + }; + TEvCommitedOffsetsResponse() {} TString TopicName; + EStatus Status; std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets; }; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index f41777db79f..382650aa210 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1208,6 +1208,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { ui64 minActivePartitions = 10; TString consumerName = "consumer-0"; + TString consumer1Name = "consumer-1"; TString key = "record-key"; TString value = "record-value"; @@ -1221,6 +1222,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { .CreateTopic(topicName, NYdb::NTopic::TCreateTopicSettings() .BeginAddConsumer(consumerName).EndAddConsumer() + .BeginAddConsumer(consumer1Name).EndAddConsumer() .PartitioningSettings(minActivePartitions, 100)) .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); @@ -1271,6 +1273,36 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } { + // Commit offset for consumer-0 + auto settings = NTopic::TReadSessionSettings() + .AppendTopics(NTopic::TTopicReadSettings(topicName)) + .ConsumerName("consumer-0"); + auto topicReader = pqClient.CreateReadSession(settings); + + auto m = Read(topicReader); + UNIT_ASSERT_EQUAL(m.size(), 1); + + UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); + auto& m0 = m[0].GetMessages()[0]; + m0.Commit(); + } + + { + // Commit offset for consumer-1 + auto settings = NTopic::TReadSessionSettings() + .AppendTopics(NTopic::TTopicReadSettings(topicName)) + .ConsumerName("consumer-1"); + auto topicReader = pqClient.CreateReadSession(settings); + + auto m = Read(topicReader); + UNIT_ASSERT_EQUAL(m.size(), 1); + + UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); + auto& m0 = m[0].GetMessages()[0]; + m0.Commit(); + } + + { // Check commited offset after produce std::map<TString, std::vector<i32>> topicsToPartions; topicsToPartions[topicName] = std::vector<i32>{0, 1, 2, 3}; @@ -1281,10 +1313,64 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4); auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; }); UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end()); - // This check faled one time under asan, commented until I figure out the exact reason. - // UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1); + UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1); } + { + // Check with nonexistent topic + std::map<TString, std::vector<i32>> topicsToPartions; + topicsToPartions["nonexTopic"] = std::vector<i32>{0, 1}; + auto msg = client.OffsetFetch(consumerName, topicsToPartions); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2); + for (const auto& partition : msg->Groups[0].Topics[0].Partitions) { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, UNKNOWN_TOPIC_OR_PARTITION); + } + } + + { + // Check with nonexistent consumer + std::map<TString, std::vector<i32>> topicsToPartions; + topicsToPartions[topicName] = std::vector<i32>{0, 1}; + auto msg = client.OffsetFetch("nonexConsumer", topicsToPartions); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2); + for (const auto& partition : msg->Groups[0].Topics[0].Partitions) { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, RESOURCE_NOT_FOUND); + } + } + + { + // Check with 2 consumers + TOffsetFetchRequestData request; + + TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics topic; + topic.Name = topicName; + auto partitionIndexes = std::vector<int>{0}; + topic.PartitionIndexes = partitionIndexes; + + TOffsetFetchRequestData::TOffsetFetchRequestGroup group0; + group0.GroupId = consumerName; + group0.Topics.push_back(topic); + request.Groups.push_back(group0); + + TOffsetFetchRequestData::TOffsetFetchRequestGroup group1; + group1.GroupId = consumer1Name; + group1.Topics.push_back(topic); + request.Groups.push_back(group1); + + auto msg = client.OffsetFetch(request); + + UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 2); + for (const auto& group: msg->Groups) { + UNIT_ASSERT_VALUES_EQUAL(group.Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].CommittedOffset, 1); + UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].ErrorCode, NONE_ERROR); + } + } } // Y_UNIT_TEST(OffsetFetchScenario) Y_UNIT_TEST(LoginWithApiKey) { |
