aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-09-12 18:19:07 +0300
committertesseract <tesseract@yandex-team.com>2023-09-12 19:36:08 +0300
commit3c0e6f878bc910f07298391b0944cd6d90a265ea (patch)
tree640bbf47508e65896254ad73182866b21583a272
parentf9df42fa4e9b160a1a3c72589846109cd086b03a (diff)
downloadydb-3c0e6f878bc910f07298391b0944cd6d90a265ea.tar.gz
Fix kafka auth logic
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp29
-rw-r--r--ydb/library/testlib/service_mocks/access_service_mock.h2
-rw-r--r--ydb/services/persqueue_v1/actors/persqueue_utils.h5
3 files changed, 18 insertions, 18 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
index e095fd11a73..97ebe5e5e53 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
@@ -12,7 +12,8 @@
namespace NKafka {
-static constexpr char EMPTY_AUTH_BYTES[] = "";
+static constexpr char EmptyAuthBytes[] = "";
+static constexpr char DebugKafkaApiFlagValue[] = "2";
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message) {
return new TKafkaSaslAuthActor(context, correlationId, address, message);
@@ -54,15 +55,16 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes
}
UserToken = ev->Get()->Token;
- if (KafkaApiFlag == "1") { // cloud mode
+ if (KafkaApiFlag != DebugKafkaApiFlagValue) {
bool gotPermission = false;
for (auto & sid : UserToken->GetGroupSIDs()) {
- if (sid == "ydb.api.kafka@as") {
+ if (sid == NKikimr::NGRpcProxy::V1::KafkaPlainAuthSid) {
gotPermission = true;
+ break;
}
}
if (!gotPermission) {
- SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "no permission 'ydb.api.kafka'", ctx);
+ SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "no permission '" << NKikimr::NGRpcProxy::V1::KafkaPlainAuthPermission << "'", ctx);
return;
}
}
@@ -83,7 +85,7 @@ void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TStri
auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>();
responseToClient->ErrorCode = errorCode;
- responseToClient->AuthBytes = TKafkaRawBytes(EMPTY_AUTH_BYTES, sizeof(EMPTY_AUTH_BYTES));
+ responseToClient->AuthBytes = TKafkaRawBytes(EmptyAuthBytes, sizeof(EmptyAuthBytes));
if (isFailed) {
KAFKA_LOG_ERROR("Authentication failure. " << errorMessage << " " << details);
@@ -203,22 +205,17 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS
for (const auto& attr : navigate->ResultSet.front().Attributes) {
if (attr.first == "folder_id") FolderId = attr.second;
- if (attr.first == "cloud_id") CloudId = attr.second;
- if (attr.first == "database_id") DatabaseId = attr.second;
- if (attr.first == "service_account_id") ServiceAccountId = attr.second;
- if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second;
- if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second;
- if (attr.first == "kafka_api") KafkaApiFlag = attr.second;
+ else if (attr.first == "cloud_id") CloudId = attr.second;
+ else if (attr.first == "database_id") DatabaseId = attr.second;
+ else if (attr.first == "service_account_id") ServiceAccountId = attr.second;
+ else if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second;
+ else if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second;
+ else if (attr.first == "kafka_api") KafkaApiFlag = attr.second;
}
if (ClientAuthData.UserName.Empty()) {
// ApiKey IAM authentification
-
- if (KafkaApiFlag != "1" && KafkaApiFlag != "2") {
- SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "kafka_api is not allowed on this database", ctx);
- return;
- }
SendApiKeyRequest();
} else {
// Login/Password authentification
diff --git a/ydb/library/testlib/service_mocks/access_service_mock.h b/ydb/library/testlib/service_mocks/access_service_mock.h
index a9419a1e5ea..e684744a7f8 100644
--- a/ydb/library/testlib/service_mocks/access_service_mock.h
+++ b/ydb/library/testlib/service_mocks/access_service_mock.h
@@ -133,7 +133,7 @@ public:
}
}
- THashSet<TString> AllowedUserPermissions = {"user1-something.read", "ApiKey-value-valid-something.read", "ApiKey-value-valid-ydb.api.kafka"};
+ THashSet<TString> AllowedUserPermissions = {"user1-something.read", "ApiKey-value-valid-something.read", "ApiKey-value-valid-ydb.api.kafkaPlainAuth"};
THashMap<TString, TString> AllowedServicePermissions = {{"service1-something.write", "root1/folder1"}};
THashSet<TString> AllowedResourceIds = {};
THashSet<TString> UnavailableUserPermissions;
diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.h b/ydb/services/persqueue_v1/actors/persqueue_utils.h
index 4ee653b1aea..b6151ba88f7 100644
--- a/ydb/services/persqueue_v1/actors/persqueue_utils.h
+++ b/ydb/services/persqueue_v1/actors/persqueue_utils.h
@@ -15,6 +15,9 @@ namespace NKikimr::NGRpcProxy::V1 {
#endif
#define PQ_LOG_PREFIX "session cookie " << Cookie << " consumer " << ClientPath << " session " << Session
+static constexpr char KafkaPlainAuthPermission[] = "ydb.api.kafkaPlainAuth";
+static constexpr char KafkaPlainAuthSid[] = "ydb.api.kafkaPlainAuth@as";
+
// moved to ydb/core/client/server/msgbus_server_persqueue.h?
// const TString& TopicPrefix(const TActorContext& ctx);
@@ -82,7 +85,7 @@ static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketPar
"ydb.streams.write"
};
if (useKafkaApi) {
- permissions.push_back("ydb.api.kafka");
+ permissions.push_back(KafkaPlainAuthPermission);
}
TVector<std::pair<TString, TString>> attributes;
if (!dbId.empty()) attributes.push_back({"database_id", dbId});