summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrey Serebryanskiy <[email protected]>2025-02-04 19:19:20 +0300
committerGitHub <[email protected]>2025-02-04 16:19:20 +0000
commitf6e8a7446933528fca0a51a7502cbc08e19f0ea4 (patch)
treeff32ebefde86db9f3a98595eee2c5034f9b859bf
parent4ce5a23e57206e5a1ab03a831b987198b33533a8 (diff)
Kafka api better client errors (#13929)
-rw-r--r--ydb/core/kafka_proxy/actors/actors.h4
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp4
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp7
-rw-r--r--ydb/public/api/protos/persqueue_error_codes_v1.proto2
-rw-r--r--ydb/services/persqueue_v1/actors/persqueue_utils.cpp1
-rw-r--r--ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp2
6 files changed, 13 insertions, 7 deletions
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h
index 3a07f1dd871..5102144fd98 100644
--- a/ydb/core/kafka_proxy/actors/actors.h
+++ b/ydb/core/kafka_proxy/actors/actors.h
@@ -52,7 +52,9 @@ struct TContext {
NKikimr::NPQ::TRlContext RlContext;
bool Authenticated() {
+
return !RequireAuthentication || AuthenticationStep == SUCCESS;
+
}
TActorId DiscoveryCacheActor;
@@ -128,6 +130,8 @@ inline EKafkaErrors ConvertErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode code)
switch (code) {
case Ydb::PersQueue::ErrorCode::ErrorCode::OK:
return EKafkaErrors::NONE_ERROR;
+ case Ydb::PersQueue::ErrorCode::ErrorCode::UNKNOWN_READ_RULE:
+ return EKafkaErrors::GROUP_ID_NOT_FOUND;
case Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST:
return EKafkaErrors::INVALID_REQUEST;
case Ydb::PersQueue::ErrorCode::ErrorCode::ERROR:
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 0a0b2aa29a4..223cc901348 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -101,9 +101,9 @@ public:
void Bootstrap() {
Context->ConnectionId = SelfId();
+ Context->RequireAuthentication = NKikimr::AppData()->EnforceUserTokenRequirement;
// if no authentication required, then we can use local database as our target
- if (!NKikimr::AppData()->EnforceUserTokenRequirement) {
- Context->RequireAuthentication = false;
+ if (!Context->RequireAuthentication) {
Context->DatabasePath = NKikimr::AppData()->TenantName;
}
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index cd6639a4f45..ed9556688ea 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -1066,7 +1066,6 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
CreateTopic(pqClient, topicName, minActivePartitions, {});
- CreateTopic(pqClient, topicName, minActivePartitions, {});
TTestClient client(testServer.Port);
@@ -1482,7 +1481,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
topics.push_back(topicName);
auto joinResponse = clientA.JoinGroup(topics, notExistsGroup);
- UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::INVALID_REQUEST)); // because TReadInitAndAuthActor returns BAD_REQUEST on failed readRule check
+ UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::GROUP_ID_NOT_FOUND));
}
{
@@ -1649,7 +1648,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Topics.back().Partitions.size(), minActivePartitions);
for (const auto& topic : msg->Topics) {
for (const auto& partition : topic.Partitions) {
- UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::INVALID_REQUEST));
+ UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::GROUP_ID_NOT_FOUND));
}
}
}
@@ -1681,7 +1680,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Topics.back().Partitions.size(), minActivePartitions);
for (const auto& topic : msg->Topics) {
for (const auto& partition : topic.Partitions) {
- UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::INVALID_REQUEST));
+ UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::GROUP_ID_NOT_FOUND));
}
}
}
diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto
index c11c3fad8da..65ef172ff00 100644
--- a/ydb/public/api/protos/persqueue_error_codes_v1.proto
+++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto
@@ -59,4 +59,6 @@ enum ErrorCode {
INVALID_ARGUMENT = 500040;
VALIDATION_ERROR = 500080;
+
+ UNKNOWN_READ_RULE = 500032;
}
diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
index d26012063a7..4af068fff99 100644
--- a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
+++ b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
@@ -111,6 +111,7 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQ
case READ_ERROR_TOO_BIG_OFFSET:
case SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
case SET_OFFSET_ERROR_COMMIT_TO_PAST:
+ case UNKNOWN_READ_RULE:
return Ydb::StatusIds::BAD_REQUEST;
case WRONG_COOKIE:
case CREATE_SESSION_ALREADY_LOCKED:
diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
index b263f5acdd0..ca65e50c184 100644
--- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
@@ -203,7 +203,7 @@ bool TReadInitAndAuthActor::CheckTopicACL(
if (!NPQ::HasConsumer(pqDescr.GetPQTabletConfig(), ClientId)) {
CloseSession(
TStringBuilder() << "no read rule provided for consumer '" << ClientPath << "' in topic '" << topic << "' in current cluster '" << LocalCluster << "'",
- PersQueue::ErrorCode::BAD_REQUEST, ctx
+ PersQueue::ErrorCode::UNKNOWN_READ_RULE, ctx
);
return false;
}