diff options
author | savnik <savnik@yandex-team.com> | 2023-09-05 15:10:05 +0300 |
---|---|---|
committer | savnik <savnik@yandex-team.com> | 2023-09-05 16:02:23 +0300 |
commit | 6565b8f5094b692e78fc5c08098a894bdd714d6a (patch) | |
tree | 2a306c59d7cba3ed6695a2781699c23adf726988 | |
parent | f44e7736680ef37b668a9439145f03213d45782c (diff) | |
download | ydb-6565b8f5094b692e78fc5c08098a894bdd714d6a.tar.gz |
Fix kafka auth
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp | 38 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h | 9 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/persqueue_utils.h | 10 |
3 files changed, 26 insertions, 31 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp index fa5edb69f4e..ff33aa2ae25 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -4,17 +4,19 @@ #include <ydb/core/base/ticket_parser.h> #include <ydb/core/kafka_proxy/kafka_events.h> #include <ydb/core/tx/scheme_board/subscriber.h> +#include <ydb/services/persqueue_v1/actors/persqueue_utils.h> #include <library/cpp/actors/core/actor.h> #include "kafka_sasl_auth_actor.h" + namespace NKafka { static constexpr char EMPTY_AUTH_BYTES[] = ""; NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message) { return new TKafkaSaslAuthActor(context, correlationId, address, message); -} +} void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) { if (Context->AuthenticationStep != EAuthSteps::WAIT_AUTH) { @@ -22,11 +24,11 @@ void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) { "Request is not valid given the current SASL state.", TStringBuilder() << "Current step: " << static_cast<int>(Context->AuthenticationStep), ctx); - return; + return; } if (Context->SaslMechanism != "PLAIN") { - SendResponseAndDie(EKafkaErrors::UNSUPPORTED_SASL_MECHANISM, - "Does not support the requested SASL mechanism.", + SendResponseAndDie(EKafkaErrors::UNSUPPORTED_SASL_MECHANISM, + "Does not support the requested SASL mechanism.", TStringBuilder() << "Requested mechanism '" << Context->SaslMechanism << "'", ctx); return; @@ -36,13 +38,11 @@ void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) { } void TKafkaSaslAuthActor::StartPlainAuth(const NActors::TActorContext& ctx) { - TAuthData authData; - if (!TryParseAuthDataTo(authData, ctx)) { + if (!TryParseAuthDataTo(ClientAuthData, ctx)) { return; } - DatabasePath = CanonizePath(authData.Database); - SendLoginRequest(authData, ctx); + DatabasePath = CanonizePath(ClientAuthData.Database); SendDescribeRequest(ctx); } @@ -52,27 +52,20 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes return; } - Authentificated = true; UserToken = ev->Get()->Token; - ReplyIfReady(ctx); + SendResponseAndDie(EKafkaErrors::NONE_ERROR, "", "", ctx); } void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& /*ctx*/) { + auto entries = NKikimr::NGRpcProxy::V1::GetTicketParserEntries(DatabaseId, FolderId); Send(NKikimr::MakeTicketParserID(), new NKikimr::TEvTicketParser::TEvAuthorizeTicket({ .Database = ev->Get()->Database, .Ticket = ev->Get()->LoginResult.token(), .PeerName = TStringBuilder() << Address, + .Entries = entries })); } -void TKafkaSaslAuthActor::ReplyIfReady(const NActors::TActorContext& ctx) { - if (!Authentificated || !Described) { - return; - } - - SendResponseAndDie(EKafkaErrors::NONE_ERROR, "", "", ctx); -} - void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx) { auto isFailed = errorCode != EKafkaErrors::NONE_ERROR; @@ -100,7 +93,7 @@ void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TStri auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, errorMessage); Send(Context->ConnectionId, authResult); } - + Die(ctx); } @@ -109,7 +102,7 @@ void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NAct } bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx) { - if (!AuthenticateRequestData->AuthBytes.has_value()) { + if (!AuthenticateRequestData->AuthBytes.has_value()) { SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "AuthBytes is empty.", ctx); return false; } @@ -130,7 +123,7 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "Database not provided.", "", ctx); return false; } - + authData.UserName = userAndDatabase.substr(0, atPos); authData.Database = userAndDatabase.substr(atPos + 1); authData.Password = password; @@ -190,8 +183,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second; if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second; } - Described = true; - ReplyIfReady(ctx); + SendLoginRequest(ClientAuthData, ctx); } } // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h index ec8f71d52bb..47b0f37285a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h @@ -13,7 +13,7 @@ namespace NKafka { -using namespace NKikimr; +using namespace NKikimr; class TKafkaSaslAuthActor: public NActors::TActorBootstrapped<TKafkaSaslAuthActor> { @@ -73,8 +73,6 @@ private: void SendDescribeRequest(const NActors::TActorContext& ctx); bool TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx); void SendResponseAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx); - - void ReplyIfReady(const NActors::TActorContext& ctx); private: const TContext::TPtr Context; @@ -83,6 +81,8 @@ private: const TMessagePtr<TSaslAuthenticateRequestData> AuthenticateRequestData; const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address; + TAuthData ClientAuthData; + TString DatabasePath; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TString FolderId; @@ -91,9 +91,6 @@ private: TString Coordinator; TString ResourcePath; TString CloudId; - - bool Authentificated = false; - bool Described = false; }; } // NKafka diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.h b/ydb/services/persqueue_v1/actors/persqueue_utils.h index 7d8246bfddc..fd54f4d82f9 100644 --- a/ydb/services/persqueue_v1/actors/persqueue_utils.h +++ b/ydb/services/persqueue_v1/actors/persqueue_utils.h @@ -73,8 +73,14 @@ void FillIssue(Ydb::Issue::IssueMessage* issue, const Ydb::PersQueue::ErrorCode: static inline TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetTicketParserEntries(const TString& dbId, const TString& folderId) { - static const TVector<TString> permissions = {"ydb.streams.write", "ydb.databases.list", - "ydb.databases.create", "ydb.databases.connect"}; + static const TVector<TString> permissions = { + "ydb.databases.list", + "ydb.databases.create", + "ydb.databases.connect", + "ydb.tables.select", + "ydb.schemas.getMetadata", + "ydb.streams.write" + }; TVector<std::pair<TString, TString>> attributes; if (!dbId.empty()) attributes.push_back({"database_id", dbId}); if (!folderId.empty()) attributes.push_back({"folder_id", folderId}); |