diff options
author | tesseract <tesseract@yandex-team.com> | 2023-09-12 18:19:07 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-09-12 19:36:08 +0300 |
commit | 3c0e6f878bc910f07298391b0944cd6d90a265ea (patch) | |
tree | 640bbf47508e65896254ad73182866b21583a272 | |
parent | f9df42fa4e9b160a1a3c72589846109cd086b03a (diff) | |
download | ydb-3c0e6f878bc910f07298391b0944cd6d90a265ea.tar.gz |
Fix kafka auth logic
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp | 29 | ||||
-rw-r--r-- | ydb/library/testlib/service_mocks/access_service_mock.h | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/persqueue_utils.h | 5 |
3 files changed, 18 insertions, 18 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp index e095fd11a73..97ebe5e5e53 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -12,7 +12,8 @@ namespace NKafka { -static constexpr char EMPTY_AUTH_BYTES[] = ""; +static constexpr char EmptyAuthBytes[] = ""; +static constexpr char DebugKafkaApiFlagValue[] = "2"; NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message) { return new TKafkaSaslAuthActor(context, correlationId, address, message); @@ -54,15 +55,16 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes } UserToken = ev->Get()->Token; - if (KafkaApiFlag == "1") { // cloud mode + if (KafkaApiFlag != DebugKafkaApiFlagValue) { bool gotPermission = false; for (auto & sid : UserToken->GetGroupSIDs()) { - if (sid == "ydb.api.kafka@as") { + if (sid == NKikimr::NGRpcProxy::V1::KafkaPlainAuthSid) { gotPermission = true; + break; } } if (!gotPermission) { - SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "no permission 'ydb.api.kafka'", ctx); + SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "no permission '" << NKikimr::NGRpcProxy::V1::KafkaPlainAuthPermission << "'", ctx); return; } } @@ -83,7 +85,7 @@ void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TStri auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>(); responseToClient->ErrorCode = errorCode; - responseToClient->AuthBytes = TKafkaRawBytes(EMPTY_AUTH_BYTES, sizeof(EMPTY_AUTH_BYTES)); + responseToClient->AuthBytes = TKafkaRawBytes(EmptyAuthBytes, sizeof(EmptyAuthBytes)); if (isFailed) { KAFKA_LOG_ERROR("Authentication failure. " << errorMessage << " " << details); @@ -203,22 +205,17 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS for (const auto& attr : navigate->ResultSet.front().Attributes) { if (attr.first == "folder_id") FolderId = attr.second; - if (attr.first == "cloud_id") CloudId = attr.second; - if (attr.first == "database_id") DatabaseId = attr.second; - if (attr.first == "service_account_id") ServiceAccountId = attr.second; - if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second; - if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second; - if (attr.first == "kafka_api") KafkaApiFlag = attr.second; + else if (attr.first == "cloud_id") CloudId = attr.second; + else if (attr.first == "database_id") DatabaseId = attr.second; + else if (attr.first == "service_account_id") ServiceAccountId = attr.second; + else if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second; + else if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second; + else if (attr.first == "kafka_api") KafkaApiFlag = attr.second; } if (ClientAuthData.UserName.Empty()) { // ApiKey IAM authentification - - if (KafkaApiFlag != "1" && KafkaApiFlag != "2") { - SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "kafka_api is not allowed on this database", ctx); - return; - } SendApiKeyRequest(); } else { // Login/Password authentification diff --git a/ydb/library/testlib/service_mocks/access_service_mock.h b/ydb/library/testlib/service_mocks/access_service_mock.h index a9419a1e5ea..e684744a7f8 100644 --- a/ydb/library/testlib/service_mocks/access_service_mock.h +++ b/ydb/library/testlib/service_mocks/access_service_mock.h @@ -133,7 +133,7 @@ public: } } - THashSet<TString> AllowedUserPermissions = {"user1-something.read", "ApiKey-value-valid-something.read", "ApiKey-value-valid-ydb.api.kafka"}; + THashSet<TString> AllowedUserPermissions = {"user1-something.read", "ApiKey-value-valid-something.read", "ApiKey-value-valid-ydb.api.kafkaPlainAuth"}; THashMap<TString, TString> AllowedServicePermissions = {{"service1-something.write", "root1/folder1"}}; THashSet<TString> AllowedResourceIds = {}; THashSet<TString> UnavailableUserPermissions; diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.h b/ydb/services/persqueue_v1/actors/persqueue_utils.h index 4ee653b1aea..b6151ba88f7 100644 --- a/ydb/services/persqueue_v1/actors/persqueue_utils.h +++ b/ydb/services/persqueue_v1/actors/persqueue_utils.h @@ -15,6 +15,9 @@ namespace NKikimr::NGRpcProxy::V1 { #endif #define PQ_LOG_PREFIX "session cookie " << Cookie << " consumer " << ClientPath << " session " << Session +static constexpr char KafkaPlainAuthPermission[] = "ydb.api.kafkaPlainAuth"; +static constexpr char KafkaPlainAuthSid[] = "ydb.api.kafkaPlainAuth@as"; + // moved to ydb/core/client/server/msgbus_server_persqueue.h? // const TString& TopicPrefix(const TActorContext& ctx); @@ -82,7 +85,7 @@ static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketPar "ydb.streams.write" }; if (useKafkaApi) { - permissions.push_back("ydb.api.kafka"); + permissions.push_back(KafkaPlainAuthPermission); } TVector<std::pair<TString, TString>> attributes; if (!dbId.empty()) attributes.push_back({"database_id", dbId}); |