aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsavnik <savnik@yandex-team.com>2023-09-05 15:10:05 +0300
committersavnik <savnik@yandex-team.com>2023-09-05 16:02:23 +0300
commit6565b8f5094b692e78fc5c08098a894bdd714d6a (patch)
tree2a306c59d7cba3ed6695a2781699c23adf726988
parentf44e7736680ef37b668a9439145f03213d45782c (diff)
downloadydb-6565b8f5094b692e78fc5c08098a894bdd714d6a.tar.gz
Fix kafka auth
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp38
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h9
-rw-r--r--ydb/services/persqueue_v1/actors/persqueue_utils.h10
3 files changed, 26 insertions, 31 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
index fa5edb69f4e..ff33aa2ae25 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
@@ -4,17 +4,19 @@
#include <ydb/core/base/ticket_parser.h>
#include <ydb/core/kafka_proxy/kafka_events.h>
#include <ydb/core/tx/scheme_board/subscriber.h>
+#include <ydb/services/persqueue_v1/actors/persqueue_utils.h>
#include <library/cpp/actors/core/actor.h>
#include "kafka_sasl_auth_actor.h"
+
namespace NKafka {
static constexpr char EMPTY_AUTH_BYTES[] = "";
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message) {
return new TKafkaSaslAuthActor(context, correlationId, address, message);
-}
+}
void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) {
if (Context->AuthenticationStep != EAuthSteps::WAIT_AUTH) {
@@ -22,11 +24,11 @@ void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) {
"Request is not valid given the current SASL state.",
TStringBuilder() << "Current step: " << static_cast<int>(Context->AuthenticationStep),
ctx);
- return;
+ return;
}
if (Context->SaslMechanism != "PLAIN") {
- SendResponseAndDie(EKafkaErrors::UNSUPPORTED_SASL_MECHANISM,
- "Does not support the requested SASL mechanism.",
+ SendResponseAndDie(EKafkaErrors::UNSUPPORTED_SASL_MECHANISM,
+ "Does not support the requested SASL mechanism.",
TStringBuilder() << "Requested mechanism '" << Context->SaslMechanism << "'",
ctx);
return;
@@ -36,13 +38,11 @@ void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) {
}
void TKafkaSaslAuthActor::StartPlainAuth(const NActors::TActorContext& ctx) {
- TAuthData authData;
- if (!TryParseAuthDataTo(authData, ctx)) {
+ if (!TryParseAuthDataTo(ClientAuthData, ctx)) {
return;
}
- DatabasePath = CanonizePath(authData.Database);
- SendLoginRequest(authData, ctx);
+ DatabasePath = CanonizePath(ClientAuthData.Database);
SendDescribeRequest(ctx);
}
@@ -52,27 +52,20 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes
return;
}
- Authentificated = true;
UserToken = ev->Get()->Token;
- ReplyIfReady(ctx);
+ SendResponseAndDie(EKafkaErrors::NONE_ERROR, "", "", ctx);
}
void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& /*ctx*/) {
+ auto entries = NKikimr::NGRpcProxy::V1::GetTicketParserEntries(DatabaseId, FolderId);
Send(NKikimr::MakeTicketParserID(), new NKikimr::TEvTicketParser::TEvAuthorizeTicket({
.Database = ev->Get()->Database,
.Ticket = ev->Get()->LoginResult.token(),
.PeerName = TStringBuilder() << Address,
+ .Entries = entries
}));
}
-void TKafkaSaslAuthActor::ReplyIfReady(const NActors::TActorContext& ctx) {
- if (!Authentificated || !Described) {
- return;
- }
-
- SendResponseAndDie(EKafkaErrors::NONE_ERROR, "", "", ctx);
-}
-
void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx) {
auto isFailed = errorCode != EKafkaErrors::NONE_ERROR;
@@ -100,7 +93,7 @@ void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TStri
auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, errorMessage);
Send(Context->ConnectionId, authResult);
}
-
+
Die(ctx);
}
@@ -109,7 +102,7 @@ void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NAct
}
bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx) {
- if (!AuthenticateRequestData->AuthBytes.has_value()) {
+ if (!AuthenticateRequestData->AuthBytes.has_value()) {
SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "AuthBytes is empty.", ctx);
return false;
}
@@ -130,7 +123,7 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut
SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "Database not provided.", "", ctx);
return false;
}
-
+
authData.UserName = userAndDatabase.substr(0, atPos);
authData.Database = userAndDatabase.substr(atPos + 1);
authData.Password = password;
@@ -190,8 +183,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS
if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second;
if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second;
}
- Described = true;
- ReplyIfReady(ctx);
+ SendLoginRequest(ClientAuthData, ctx);
}
} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
index ec8f71d52bb..47b0f37285a 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
@@ -13,7 +13,7 @@
namespace NKafka {
-using namespace NKikimr;
+using namespace NKikimr;
class TKafkaSaslAuthActor: public NActors::TActorBootstrapped<TKafkaSaslAuthActor> {
@@ -73,8 +73,6 @@ private:
void SendDescribeRequest(const NActors::TActorContext& ctx);
bool TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx);
void SendResponseAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx);
-
- void ReplyIfReady(const NActors::TActorContext& ctx);
private:
const TContext::TPtr Context;
@@ -83,6 +81,8 @@ private:
const TMessagePtr<TSaslAuthenticateRequestData> AuthenticateRequestData;
const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address;
+ TAuthData ClientAuthData;
+
TString DatabasePath;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TString FolderId;
@@ -91,9 +91,6 @@ private:
TString Coordinator;
TString ResourcePath;
TString CloudId;
-
- bool Authentificated = false;
- bool Described = false;
};
} // NKafka
diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.h b/ydb/services/persqueue_v1/actors/persqueue_utils.h
index 7d8246bfddc..fd54f4d82f9 100644
--- a/ydb/services/persqueue_v1/actors/persqueue_utils.h
+++ b/ydb/services/persqueue_v1/actors/persqueue_utils.h
@@ -73,8 +73,14 @@ void FillIssue(Ydb::Issue::IssueMessage* issue, const Ydb::PersQueue::ErrorCode:
static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketParserEntries(const TString& dbId, const TString& folderId) {
- static const TVector<TString> permissions = {"ydb.streams.write", "ydb.databases.list",
- "ydb.databases.create", "ydb.databases.connect"};
+ static const TVector<TString> permissions = {
+ "ydb.databases.list",
+ "ydb.databases.create",
+ "ydb.databases.connect",
+ "ydb.tables.select",
+ "ydb.schemas.getMetadata",
+ "ydb.streams.write"
+ };
TVector<std::pair<TString, TString>> attributes;
if (!dbId.empty()) attributes.push_back({"database_id", dbId});
if (!folderId.empty()) attributes.push_back({"folder_id", folderId});