diff options
author | tesseract <tesseract@yandex-team.com> | 2023-09-08 20:43:33 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-09-08 21:10:14 +0300 |
commit | a7323013e847f0d927e4231cb21daf32dce3b6c1 (patch) | |
tree | aa772b5e030c4e48520fac9a9aeec1fbdacc4c7e | |
parent | cc56e53ff13c7ed7efc104c0d3c29851afe9414f (diff) | |
download | ydb-a7323013e847f0d927e4231cb21daf32dce3b6c1.tar.gz |
fix for auth race in kafka
fix test for https://a.yandex-team.ru/review/4474884/details
5 files changed, 46 insertions, 19 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp index 89ab305b5a..e095fd11a7 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -44,14 +44,6 @@ void TKafkaSaslAuthActor::StartPlainAuth(const NActors::TActorContext& ctx) { DatabasePath = CanonizePath(ClientAuthData.Database); - if (ClientAuthData.UserName.Empty()) { - // ApiKey IAM authentification - SendApiKeyRequest(); - } else { - // Login/Password authentification - SendLoginRequest(ClientAuthData, ctx); - } - SendDescribeRequest(ctx); } @@ -60,18 +52,29 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->Error.Message, ctx); return; } - UserToken = ev->Get()->Token; + + if (KafkaApiFlag == "1") { // cloud mode + bool gotPermission = false; + for (auto & sid : UserToken->GetGroupSIDs()) { + if (sid == "ydb.api.kafka@as") { + gotPermission = true; + } + } + if (!gotPermission) { + SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "no permission 'ydb.api.kafka'", ctx); + return; + } + } + SendResponseAndDie(EKafkaErrors::NONE_ERROR, "", "", ctx); } void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& /*ctx*/) { - auto entries = NKikimr::NGRpcProxy::V1::GetTicketParserEntries(DatabaseId, FolderId); Send(NKikimr::MakeTicketParserID(), new NKikimr::TEvTicketParser::TEvAuthorizeTicket({ .Database = ev->Get()->Database, .Ticket = "Login " + ev->Get()->LoginResult.token(), .PeerName = TStringBuilder() << Address, - .Entries = entries })); } @@ -99,7 +102,8 @@ void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TStri responseToClient->ErrorMessage = ""; auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, errorCode); - auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, IsServerless, errorMessage); + auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, + ResourcePath, IsServerless, errorMessage); Send(Context->ConnectionId, authResult); } @@ -168,10 +172,13 @@ void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authDa } void TKafkaSaslAuthActor::SendApiKeyRequest() { + auto entries = NKikimr::NGRpcProxy::V1::GetTicketParserEntries(DatabaseId, FolderId, true); + Send(NKikimr::MakeTicketParserID(), new NKikimr::TEvTicketParser::TEvAuthorizeTicket({ .Database = DatabasePath, .Ticket = "ApiKey " + ClientAuthData.Password, .PeerName = TStringBuilder() << Address, + .Entries = entries })); } @@ -193,6 +200,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS } Y_VERIFY(navigate->ResultSet.size() == 1); IsServerless = navigate->ResultSet.front().DomainInfo->IsServerless(); + for (const auto& attr : navigate->ResultSet.front().Attributes) { if (attr.first == "folder_id") FolderId = attr.second; if (attr.first == "cloud_id") CloudId = attr.second; @@ -200,8 +208,22 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS if (attr.first == "service_account_id") ServiceAccountId = attr.second; if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second; if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second; + if (attr.first == "kafka_api") KafkaApiFlag = attr.second; + } + + + if (ClientAuthData.UserName.Empty()) { + // ApiKey IAM authentification + + if (KafkaApiFlag != "1" && KafkaApiFlag != "2") { + SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "kafka_api is not allowed on this database", ctx); + return; + } + SendApiKeyRequest(); + } else { + // Login/Password authentification + SendLoginRequest(ClientAuthData, ctx); } - SendLoginRequest(ClientAuthData, ctx); } } // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h index 6a13e97ab4..8f484c16bb 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h @@ -92,6 +92,7 @@ private: TString Coordinator; TString ResourcePath; TString CloudId; + TString KafkaApiFlag; bool IsServerless; }; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index ef17732685..43938b9439 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -60,7 +60,7 @@ class TTestServer { public: TIpPort Port; - TTestServer() { + TTestServer(const TString& kafkaApiMode = "1") { TPortManager portManager; Port = portManager.GetTcpPort(); @@ -159,6 +159,7 @@ public: client.AlterUserAttributes("/", "Root", {{"folder_id", DEFAULT_FOLDER_ID}, {"cloud_id", DEFAULT_CLOUD_ID}, + {"kafka_api", kafkaApiMode}, {"database_id", "root"}, {"serverless_rt_coordination_node_path", "/Coordinator/Root"}, {"serverless_rt_base_resource_ru", "/ru_Root"}})); @@ -415,7 +416,7 @@ private: Y_UNIT_TEST_SUITE(KafkaProtocol) { Y_UNIT_TEST(ProduceScenario) { - TInsecureTestServer testServer; + TInsecureTestServer testServer("2"); TString topicName = "/Root/topic-0-test"; @@ -657,7 +658,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { auto msg = client.SaslAuthenticate("@/Root", "ApiKey-value-valid"); - + Cerr << msg->ErrorMessage << "\n"; UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); } diff --git a/ydb/library/testlib/service_mocks/access_service_mock.h b/ydb/library/testlib/service_mocks/access_service_mock.h index 62ad5e1c04..a9419a1e5e 100644 --- a/ydb/library/testlib/service_mocks/access_service_mock.h +++ b/ydb/library/testlib/service_mocks/access_service_mock.h @@ -133,7 +133,7 @@ public: } } - THashSet<TString> AllowedUserPermissions = {"user1-something.read", "ApiKey-value-valid-something.read"}; + THashSet<TString> AllowedUserPermissions = {"user1-something.read", "ApiKey-value-valid-something.read", "ApiKey-value-valid-ydb.api.kafka"}; THashMap<TString, TString> AllowedServicePermissions = {{"service1-something.write", "root1/folder1"}}; THashSet<TString> AllowedResourceIds = {}; THashSet<TString> UnavailableUserPermissions; diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.h b/ydb/services/persqueue_v1/actors/persqueue_utils.h index fd54f4d82f..4ee653b1ae 100644 --- a/ydb/services/persqueue_v1/actors/persqueue_utils.h +++ b/ydb/services/persqueue_v1/actors/persqueue_utils.h @@ -72,8 +72,8 @@ static inline bool InternalErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode errorC void FillIssue(Ydb::Issue::IssueMessage* issue, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const TString& errorReason); -static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketParserEntries(const TString& dbId, const TString& folderId) { - static const TVector<TString> permissions = { +static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketParserEntries(const TString& dbId, const TString& folderId, bool useKafkaApi = false) { + TVector<TString> permissions = { "ydb.databases.list", "ydb.databases.create", "ydb.databases.connect", @@ -81,6 +81,9 @@ static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketPar "ydb.schemas.getMetadata", "ydb.streams.write" }; + if (useKafkaApi) { + permissions.push_back("ydb.api.kafka"); + } TVector<std::pair<TString, TString>> attributes; if (!dbId.empty()) attributes.push_back({"database_id", dbId}); if (!folderId.empty()) attributes.push_back({"folder_id", folderId}); |