aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-09-08 20:43:33 +0300
committertesseract <tesseract@yandex-team.com>2023-09-08 21:10:14 +0300
commita7323013e847f0d927e4231cb21daf32dce3b6c1 (patch)
treeaa772b5e030c4e48520fac9a9aeec1fbdacc4c7e
parentcc56e53ff13c7ed7efc104c0d3c29851afe9414f (diff)
downloadydb-a7323013e847f0d927e4231cb21daf32dce3b6c1.tar.gz
fix for auth race in kafka
fix test for https://a.yandex-team.ru/review/4474884/details
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp48
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h1
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp7
-rw-r--r--ydb/library/testlib/service_mocks/access_service_mock.h2
-rw-r--r--ydb/services/persqueue_v1/actors/persqueue_utils.h7
5 files changed, 46 insertions, 19 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
index 89ab305b5a..e095fd11a7 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
@@ -44,14 +44,6 @@ void TKafkaSaslAuthActor::StartPlainAuth(const NActors::TActorContext& ctx) {
DatabasePath = CanonizePath(ClientAuthData.Database);
- if (ClientAuthData.UserName.Empty()) {
- // ApiKey IAM authentification
- SendApiKeyRequest();
- } else {
- // Login/Password authentification
- SendLoginRequest(ClientAuthData, ctx);
- }
-
SendDescribeRequest(ctx);
}
@@ -60,18 +52,29 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes
SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->Error.Message, ctx);
return;
}
-
UserToken = ev->Get()->Token;
+
+ if (KafkaApiFlag == "1") { // cloud mode
+ bool gotPermission = false;
+ for (auto & sid : UserToken->GetGroupSIDs()) {
+ if (sid == "ydb.api.kafka@as") {
+ gotPermission = true;
+ }
+ }
+ if (!gotPermission) {
+ SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "no permission 'ydb.api.kafka'", ctx);
+ return;
+ }
+ }
+
SendResponseAndDie(EKafkaErrors::NONE_ERROR, "", "", ctx);
}
void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& /*ctx*/) {
- auto entries = NKikimr::NGRpcProxy::V1::GetTicketParserEntries(DatabaseId, FolderId);
Send(NKikimr::MakeTicketParserID(), new NKikimr::TEvTicketParser::TEvAuthorizeTicket({
.Database = ev->Get()->Database,
.Ticket = "Login " + ev->Get()->LoginResult.token(),
.PeerName = TStringBuilder() << Address,
- .Entries = entries
}));
}
@@ -99,7 +102,8 @@ void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TStri
responseToClient->ErrorMessage = "";
auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, errorCode);
- auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, IsServerless, errorMessage);
+ auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator,
+ ResourcePath, IsServerless, errorMessage);
Send(Context->ConnectionId, authResult);
}
@@ -168,10 +172,13 @@ void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authDa
}
void TKafkaSaslAuthActor::SendApiKeyRequest() {
+ auto entries = NKikimr::NGRpcProxy::V1::GetTicketParserEntries(DatabaseId, FolderId, true);
+
Send(NKikimr::MakeTicketParserID(), new NKikimr::TEvTicketParser::TEvAuthorizeTicket({
.Database = DatabasePath,
.Ticket = "ApiKey " + ClientAuthData.Password,
.PeerName = TStringBuilder() << Address,
+ .Entries = entries
}));
}
@@ -193,6 +200,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS
}
Y_VERIFY(navigate->ResultSet.size() == 1);
IsServerless = navigate->ResultSet.front().DomainInfo->IsServerless();
+
for (const auto& attr : navigate->ResultSet.front().Attributes) {
if (attr.first == "folder_id") FolderId = attr.second;
if (attr.first == "cloud_id") CloudId = attr.second;
@@ -200,8 +208,22 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS
if (attr.first == "service_account_id") ServiceAccountId = attr.second;
if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second;
if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second;
+ if (attr.first == "kafka_api") KafkaApiFlag = attr.second;
+ }
+
+
+ if (ClientAuthData.UserName.Empty()) {
+ // ApiKey IAM authentification
+
+ if (KafkaApiFlag != "1" && KafkaApiFlag != "2") {
+ SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "kafka_api is not allowed on this database", ctx);
+ return;
+ }
+ SendApiKeyRequest();
+ } else {
+ // Login/Password authentification
+ SendLoginRequest(ClientAuthData, ctx);
}
- SendLoginRequest(ClientAuthData, ctx);
}
} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
index 6a13e97ab4..8f484c16bb 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
@@ -92,6 +92,7 @@ private:
TString Coordinator;
TString ResourcePath;
TString CloudId;
+ TString KafkaApiFlag;
bool IsServerless;
};
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index ef17732685..43938b9439 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -60,7 +60,7 @@ class TTestServer {
public:
TIpPort Port;
- TTestServer() {
+ TTestServer(const TString& kafkaApiMode = "1") {
TPortManager portManager;
Port = portManager.GetTcpPort();
@@ -159,6 +159,7 @@ public:
client.AlterUserAttributes("/", "Root",
{{"folder_id", DEFAULT_FOLDER_ID},
{"cloud_id", DEFAULT_CLOUD_ID},
+ {"kafka_api", kafkaApiMode},
{"database_id", "root"},
{"serverless_rt_coordination_node_path", "/Coordinator/Root"},
{"serverless_rt_base_resource_ru", "/ru_Root"}}));
@@ -415,7 +416,7 @@ private:
Y_UNIT_TEST_SUITE(KafkaProtocol) {
Y_UNIT_TEST(ProduceScenario) {
- TInsecureTestServer testServer;
+ TInsecureTestServer testServer("2");
TString topicName = "/Root/topic-0-test";
@@ -657,7 +658,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
{
auto msg = client.SaslAuthenticate("@/Root", "ApiKey-value-valid");
-
+ Cerr << msg->ErrorMessage << "\n";
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
}
diff --git a/ydb/library/testlib/service_mocks/access_service_mock.h b/ydb/library/testlib/service_mocks/access_service_mock.h
index 62ad5e1c04..a9419a1e5e 100644
--- a/ydb/library/testlib/service_mocks/access_service_mock.h
+++ b/ydb/library/testlib/service_mocks/access_service_mock.h
@@ -133,7 +133,7 @@ public:
}
}
- THashSet<TString> AllowedUserPermissions = {"user1-something.read", "ApiKey-value-valid-something.read"};
+ THashSet<TString> AllowedUserPermissions = {"user1-something.read", "ApiKey-value-valid-something.read", "ApiKey-value-valid-ydb.api.kafka"};
THashMap<TString, TString> AllowedServicePermissions = {{"service1-something.write", "root1/folder1"}};
THashSet<TString> AllowedResourceIds = {};
THashSet<TString> UnavailableUserPermissions;
diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.h b/ydb/services/persqueue_v1/actors/persqueue_utils.h
index fd54f4d82f..4ee653b1ae 100644
--- a/ydb/services/persqueue_v1/actors/persqueue_utils.h
+++ b/ydb/services/persqueue_v1/actors/persqueue_utils.h
@@ -72,8 +72,8 @@ static inline bool InternalErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode errorC
void FillIssue(Ydb::Issue::IssueMessage* issue, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const TString& errorReason);
-static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketParserEntries(const TString& dbId, const TString& folderId) {
- static const TVector<TString> permissions = {
+static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketParserEntries(const TString& dbId, const TString& folderId, bool useKafkaApi = false) {
+ TVector<TString> permissions = {
"ydb.databases.list",
"ydb.databases.create",
"ydb.databases.connect",
@@ -81,6 +81,9 @@ static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketPar
"ydb.schemas.getMetadata",
"ydb.streams.write"
};
+ if (useKafkaApi) {
+ permissions.push_back("ydb.api.kafka");
+ }
TVector<std::pair<TString, TString>> attributes;
if (!dbId.empty()) attributes.push_back({"database_id", dbId});
if (!folderId.empty()) attributes.push_back({"folder_id", folderId});