summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsergeyveselov <[email protected]>2023-12-11 23:13:46 +0300
committersergeyveselov <[email protected]>2023-12-11 23:44:41 +0300
commitb8c6f928ac8754689d03f515c41d55073326a5fd (patch)
tree0636f7ff2bc22807fd0f5457deb76ed965b15f57
parent37e0c276d8f499c053b0071970902cee318a1700 (diff)
Handle RaiseError calls in OffsetFetchActor
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp92
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h17
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h7
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp90
4 files changed, 170 insertions, 36 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
index 77cb1c151f4..b6d537aed64 100644
--- a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
@@ -41,8 +41,8 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
std::shared_ptr<TSet<ui32>> partitions)
: TBase(request, requester)
, TDescribeTopicActorImpl(ConsumerOffsetSettings(consumers, partitions))
- , Requester_(requester)
- , TopicName_(request.Topic)
+ , Requester(requester)
+ , TopicName(request.Topic)
{
Y_UNUSED(requester);
};
@@ -63,7 +63,6 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
}
}
- // Noop
void RaiseError(const TString& error,
const Ydb::PersQueue::ErrorCode::ErrorCode errorCode,
const Ydb::StatusIds::StatusCode status,
@@ -72,6 +71,12 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
Y_UNUSED(errorCode);
Y_UNUSED(status);
Y_UNUSED(ctx);
+
+ THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse());
+ response->TopicName = TopicName;
+ response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::ERROR;
+ Send(Requester, response.Release());
+ Die(ctx);
}
void ApplyResponse(TTabletInfo& tabletInfo,
@@ -86,7 +91,7 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
consumerToOffset[consumerResult.GetConsumer()] = consumerResult.GetCommitedOffset();
}
}
- (*PartitionIdToOffsets_)[partResult.GetPartition()] = consumerToOffset;
+ (*PartitionIdToOffsets)[partResult.GetPartition()] = consumerToOffset;
}
};
@@ -99,17 +104,24 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
Y_UNUSED(ev);
};
- // Noop
+ // Should never be called
bool ApplyResponse(NKikimr::TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev,
const TActorContext& ctx) override {
Y_UNUSED(ctx);
Y_UNUSED(ev);
- return true;
+ Y_ABORT();
};
void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override {
const auto& response = ev->Get()->Request.Get()->ResultSet.front();
- Y_ABORT_UNLESS(response.PQGroupInfo);
+ if (!response.PQGroupInfo) {
+ THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse());
+ response->TopicName = TopicName;
+ response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::UNKNOWN_TOPIC;
+ Send(Requester, response.Release());
+ TActorBootstrapped::PassAway();
+ return;
+ }
const auto& pqDescr = response.PQGroupInfo->Description;
ProcessTablets(pqDescr, ActorContext());
@@ -117,17 +129,18 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
void Reply(const TActorContext& ctx) override {
THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse());
- response->TopicName = TopicName_;
- response->PartitionIdToOffsets = PartitionIdToOffsets_;
- Send(Requester_, response.Release());
+ response->TopicName = TopicName;
+ response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::OK;
+ response->PartitionIdToOffsets = PartitionIdToOffsets;
+ Send(Requester, response.Release());
Die(ctx);
};
private:
- TActorId Requester_;
- TString TopicName_;
- std::unordered_map<ui32, ui32> PartitionIdToOffset_ {};
- std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets_ = std::make_shared<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>();
+ TActorId Requester;
+ TString TopicName;
+ std::unordered_map<ui32, ui32> PartitionIdToOffset {};
+ std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets = std::make_shared<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>();
};
NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetFetchRequestData>& message) {
@@ -143,7 +156,27 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics topic;
TString topicName = requestTopic.Name.value();
topic.Name = topicName;
- auto partitionsToOffsets = TopicToOffsets_[topicName];
+ if (UnknownTopics.contains(topicName)) {
+ for (auto requestPartition: requestTopic.PartitionIndexes) {
+ TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition;
+ partition.PartitionIndex = requestPartition;
+ partition.ErrorCode = UNKNOWN_TOPIC_OR_PARTITION;
+ topic.Partitions.push_back(partition);
+ }
+ group.Topics.push_back(topic);
+ continue;
+ }
+ if (ErroredTopics.contains(topicName)) {
+ for (auto requestPartition: requestTopic.PartitionIndexes) {
+ TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition;
+ partition.PartitionIndex = requestPartition;
+ partition.ErrorCode = UNKNOWN_SERVER_ERROR;
+ topic.Partitions.push_back(partition);
+ }
+ group.Topics.push_back(topic);
+ continue;
+ }
+ auto partitionsToOffsets = TopicToOffsets[topicName];
for (auto requestPartition: requestTopic.PartitionIndexes) {
TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition;
partition.PartitionIndex = requestPartition;
@@ -169,12 +202,12 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
partition.PartitionIndex = sourcePartition.PartitionIndex;
partition.ErrorCode = sourcePartition.ErrorCode;
}
+ response->Topics.push_back(topic);
}
}
return response;
}
-
void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
// If API level <= 7, Groups would be empty. In this case we convert message to level 8 and process it uniformely later
if (Message->Groups.empty()) {
@@ -196,7 +229,7 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
}
}
- for (const auto& topicToEntities : TopicToEntities_) {
+ for (const auto& topicToEntities : TopicToEntities) {
NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest locationRequest{};
locationRequest.Topic = topicToEntities.first;
locationRequest.Token = Context->UserToken->GetSerializedToken();
@@ -207,7 +240,7 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
SelfId(),
topicToEntities.second.Partitions
));
- InflyTopics_++;
+ InflyTopics++;
}
Become(&TKafkaOffsetFetchActor::StateWork);
}
@@ -215,27 +248,26 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
void TKafkaOffsetFetchActor::ExtractPartitions(const TString& group, const NKafka::TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics& topic) {
TString topicName = topic.Name.value();
- if (!TopicToEntities_.contains(topicName)) {
+ if (!TopicToEntities.contains(topicName)) {
TopicEntities newEntities;
- TopicToEntities_[topicName] = newEntities;
+ TopicToEntities[topicName] = newEntities;
}
- TopicEntities& entities = TopicToEntities_[topicName];
+ TopicEntities& entities = TopicToEntities[topicName];
entities.Consumers->insert(group);
for (auto partition: topic.PartitionIndexes) {
entities.Partitions->insert(partition);
}
};
-void TKafkaOffsetFetchActor::StateWork(TAutoPtr<IEventHandle>& ev) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvKafka::TEvCommitedOffsetsResponse, Handle);
- }
-}
-
void TKafkaOffsetFetchActor::Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx) {
- InflyTopics_--;
- TopicToOffsets_[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets;
- if (InflyTopics_ == 0) {
+ InflyTopics--;
+ TopicToOffsets[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets;
+ if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::ERROR) {
+ ErroredTopics.insert(ev->Get()->TopicName);
+ } else if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::UNKNOWN_TOPIC) {
+ UnknownTopics.insert(ev->Get()->TopicName);
+ }
+ if (InflyTopics == 0) {
auto response = GetOffsetFetchResponse();
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h
index 09d01cc0669..ec7f1075df7 100644
--- a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h
@@ -19,18 +19,27 @@ public:
}
void Bootstrap(const NActors::TActorContext& ctx);
- void StateWork(TAutoPtr<IEventHandle>& ev);
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvKafka::TEvCommitedOffsetsResponse, Handle);
+ }
+ }
+
void Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx);
void ExtractPartitions(const TString& group, const NKafka::TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics& topic);
TOffsetFetchResponseData::TPtr GetOffsetFetchResponse();
+ void ReplyError(const TActorContext& ctx);
private:
const TContext::TPtr Context;
const ui64 CorrelationId;
const TMessagePtr<TOffsetFetchRequestData> Message;
- std::unordered_map<TString, TopicEntities> TopicToEntities_;
- std::unordered_map<TString, std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>> TopicToOffsets_;
- ui32 InflyTopics_ = 0;
+ std::unordered_map<TString, TopicEntities> TopicToEntities;
+ std::unordered_map<TString, std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>> TopicToOffsets;
+ std::set<TString> UnknownTopics;
+ std::set<TString> ErroredTopics;
+ ui32 InflyTopics = 0;
};
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index dc09b269a4e..e0ddb88a2bc 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -210,10 +210,17 @@ struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResp
struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
{
+ enum EStatus {
+ OK,
+ ERROR,
+ UNKNOWN_TOPIC,
+ };
+
TEvCommitedOffsetsResponse()
{}
TString TopicName;
+ EStatus Status;
std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets;
};
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index f41777db79f..382650aa210 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -1208,6 +1208,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
ui64 minActivePartitions = 10;
TString consumerName = "consumer-0";
+ TString consumer1Name = "consumer-1";
TString key = "record-key";
TString value = "record-value";
@@ -1221,6 +1222,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
.CreateTopic(topicName,
NYdb::NTopic::TCreateTopicSettings()
.BeginAddConsumer(consumerName).EndAddConsumer()
+ .BeginAddConsumer(consumer1Name).EndAddConsumer()
.PartitioningSettings(minActivePartitions, 100))
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
@@ -1271,6 +1273,36 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
{
+ // Commit offset for consumer-0
+ auto settings = NTopic::TReadSessionSettings()
+ .AppendTopics(NTopic::TTopicReadSettings(topicName))
+ .ConsumerName("consumer-0");
+ auto topicReader = pqClient.CreateReadSession(settings);
+
+ auto m = Read(topicReader);
+ UNIT_ASSERT_EQUAL(m.size(), 1);
+
+ UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
+ auto& m0 = m[0].GetMessages()[0];
+ m0.Commit();
+ }
+
+ {
+ // Commit offset for consumer-1
+ auto settings = NTopic::TReadSessionSettings()
+ .AppendTopics(NTopic::TTopicReadSettings(topicName))
+ .ConsumerName("consumer-1");
+ auto topicReader = pqClient.CreateReadSession(settings);
+
+ auto m = Read(topicReader);
+ UNIT_ASSERT_EQUAL(m.size(), 1);
+
+ UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
+ auto& m0 = m[0].GetMessages()[0];
+ m0.Commit();
+ }
+
+ {
// Check commited offset after produce
std::map<TString, std::vector<i32>> topicsToPartions;
topicsToPartions[topicName] = std::vector<i32>{0, 1, 2, 3};
@@ -1281,10 +1313,64 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4);
auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; });
UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end());
- // This check faled one time under asan, commented until I figure out the exact reason.
- // UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1);
+ UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1);
}
+ {
+ // Check with nonexistent topic
+ std::map<TString, std::vector<i32>> topicsToPartions;
+ topicsToPartions["nonexTopic"] = std::vector<i32>{0, 1};
+ auto msg = client.OffsetFetch(consumerName, topicsToPartions);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2);
+ for (const auto& partition : msg->Groups[0].Topics[0].Partitions) {
+ UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, UNKNOWN_TOPIC_OR_PARTITION);
+ }
+ }
+
+ {
+ // Check with nonexistent consumer
+ std::map<TString, std::vector<i32>> topicsToPartions;
+ topicsToPartions[topicName] = std::vector<i32>{0, 1};
+ auto msg = client.OffsetFetch("nonexConsumer", topicsToPartions);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2);
+ for (const auto& partition : msg->Groups[0].Topics[0].Partitions) {
+ UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, RESOURCE_NOT_FOUND);
+ }
+ }
+
+ {
+ // Check with 2 consumers
+ TOffsetFetchRequestData request;
+
+ TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics topic;
+ topic.Name = topicName;
+ auto partitionIndexes = std::vector<int>{0};
+ topic.PartitionIndexes = partitionIndexes;
+
+ TOffsetFetchRequestData::TOffsetFetchRequestGroup group0;
+ group0.GroupId = consumerName;
+ group0.Topics.push_back(topic);
+ request.Groups.push_back(group0);
+
+ TOffsetFetchRequestData::TOffsetFetchRequestGroup group1;
+ group1.GroupId = consumer1Name;
+ group1.Topics.push_back(topic);
+ request.Groups.push_back(group1);
+
+ auto msg = client.OffsetFetch(request);
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 2);
+ for (const auto& group: msg->Groups) {
+ UNIT_ASSERT_VALUES_EQUAL(group.Topics.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].CommittedOffset, 1);
+ UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].ErrorCode, NONE_ERROR);
+ }
+ }
} // Y_UNIT_TEST(OffsetFetchScenario)
Y_UNIT_TEST(LoginWithApiKey) {