diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-14 13:14:56 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-14 14:37:42 +0300 |
commit | 9547d91f81ce434a8cf900b1562bc87c88abfb79 (patch) | |
tree | 11263be3b578270dbda4ccaca8f9faa170720bf5 | |
parent | febf300349ed585aa250bef06bfbdb91597a08bd (diff) | |
download | ydb-9547d91f81ce434a8cf900b1562bc87c88abfb79.tar.gz |
Improvements to PRODUCE
- Block non-authentificated requests except API_VERSION, SASL_HANDSHAKE and SASL_AUTHENTIFICATE
- All unauthentificated request process sequentially
- Use path id as topic id
- refactoring - extract Context
18 files changed, 176 insertions, 124 deletions
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 7b83a8f60cb..1b98abe4d9f 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -1,7 +1,51 @@ #pragma once +#include <ydb/core/protos/config.pb.h> +#include <ydb/library/aclib/aclib.h> + +#include "../kafka_messages.h" + namespace NKafka { - enum EAuthSteps { WAIT_HANDSHAKE, WAIT_AUTH, SUCCESS, FAILED }; +enum EAuthSteps { + WAIT_HANDSHAKE, + WAIT_AUTH, + SUCCESS, + FAILED +}; + +struct TContext { + using TPtr = std::shared_ptr<TContext>; + + TContext(const NKikimrConfig::TKafkaProxyConfig& config) + : Config(config) { + } + + const NKikimrConfig::TKafkaProxyConfig& Config; + + TActorId ConnectionId; + + + EAuthSteps AuthenticationStep = EAuthSteps::WAIT_HANDSHAKE; + TString SaslMechanism; + + TString Database; + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + TString ClientDC; + + bool Authenticated() { return AuthenticationStep == SUCCESS; } +}; + +inline bool RequireAuthentication(EApiKey apiKey) { + return !(EApiKey::API_VERSIONS == apiKey || EApiKey::SASL_HANDSHAKE == apiKey || EApiKey::SASL_AUTHENTICATE == apiKey); +} + +NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TApiVersionsRequestData* message); +NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TInitProducerIdRequestData* message); +NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMetadataRequestData* message); +NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context); +NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TSaslHandshakeRequestData* message); +NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message); + } // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp index 8837bcb230a..bb24a616156 100644 --- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp @@ -1,9 +1,11 @@ #include "kafka_api_versions_actor.h" +#include <ydb/core/kafka_proxy/kafka_events.h> + namespace NKafka { -NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message) { - return new TKafkaApiVersionsActor(parent, correlationId, message); +NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TApiVersionsRequestData* message) { + return new TKafkaApiVersionsActor(context, correlationId, message); } TApiVersionsResponseData::TPtr GetApiVersions() { @@ -40,7 +42,7 @@ TApiVersionsResponseData::TPtr GetApiVersions() { void TKafkaApiVersionsActor::Bootstrap(const NActors::TActorContext& ctx) { Y_UNUSED(Message); - Send(Parent, new TEvKafka::TEvResponse(CorrelationId, GetApiVersions())); + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, GetApiVersions())); Die(ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h index 4a00822b864..69adc294f60 100644 --- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h @@ -1,12 +1,13 @@ -#include "../kafka_events.h" +#include "actors.h" + #include <library/cpp/actors/core/actor_bootstrapped.h> namespace NKafka { class TKafkaApiVersionsActor: public NActors::TActorBootstrapped<TKafkaApiVersionsActor> { public: - TKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message) - : Parent(parent) + TKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TApiVersionsRequestData* message) + : Context(context) , CorrelationId(correlationId) , Message(message) { } @@ -14,7 +15,7 @@ public: void Bootstrap(const NActors::TActorContext& ctx); private: - const TActorId Parent; + const TContext::TPtr Context; const ui64 CorrelationId; const TApiVersionsRequestData* Message; }; diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp index b5ec20346b3..71bd8503fca 100644 --- a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp @@ -1,16 +1,19 @@ #include "kafka_init_producer_id_actor.h" +#include <util/random/random.h> +#include <ydb/core/kafka_proxy/kafka_events.h> + namespace NKafka { -NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message) { - return new TKafkaInitProducerIdActor(parent, correlationId, message); +NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TInitProducerIdRequestData* message) { + return new TKafkaInitProducerIdActor(context, correlationId, message); } -TInitProducerIdResponseData::TPtr GetResponse() { +TInitProducerIdResponseData::TPtr GetResponse(const NActors::TActorContext& ctx) { TInitProducerIdResponseData::TPtr response = std::make_shared<TInitProducerIdResponseData>(); - response->ProducerEpoch = 1; - response->ProducerId = 1; + response->ProducerEpoch = 0; + response->ProducerId = ((ctx.Now().MilliSeconds() << 16) & 0x7FFFFFFFFFFF) + RandomNumber<ui16>(); response->ErrorCode = EKafkaErrors::NONE_ERROR; response->ThrottleTimeMs = 0; @@ -20,7 +23,7 @@ TInitProducerIdResponseData::TPtr GetResponse() { void TKafkaInitProducerIdActor::Bootstrap(const NActors::TActorContext& ctx) { Y_UNUSED(Message); - Send(Parent, new TEvKafka::TEvResponse(CorrelationId, GetResponse())); + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, GetResponse(ctx))); Die(ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h index 78e5622d2a6..08e5f755f0a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h @@ -1,12 +1,13 @@ -#include "../kafka_events.h" +#include "actors.h" + #include <library/cpp/actors/core/actor_bootstrapped.h> namespace NKafka { class TKafkaInitProducerIdActor: public NActors::TActorBootstrapped<TKafkaInitProducerIdActor> { public: - TKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message) - : Parent(parent) + TKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TInitProducerIdRequestData* message) + : Context(context) , CorrelationId(correlationId) , Message(message) { } @@ -14,7 +15,7 @@ public: void Bootstrap(const NActors::TActorContext& ctx); private: - const TActorId Parent; + const TContext::TPtr Context; const ui64 CorrelationId; const TInitProducerIdRequestData* Message; }; diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index 9b251733ae9..0951a66e05c 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -1,21 +1,22 @@ #include "kafka_metadata_actor.h" + +#include <ydb/core/kafka_proxy/kafka_events.h> #include <ydb/services/persqueue_v1/actors/schema_actors.h> namespace NKafka { using namespace NKikimr::NGRpcProxy::V1; -NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, - const NACLib::TUserToken* userToken, +NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, - const TMetadataRequestData* message, - const NKikimrConfig::TKafkaProxyConfig& config) { - return new TKafkaMetadataActor(parent, userToken, correlationId, message, config); + const TMetadataRequestData* message) { + return new TKafkaMetadataActor(context, correlationId, message); } void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { Response->Topics.resize(Message->Topics.size()); + THashMap<TString, TActorId> partitionActors; - for (auto i = 0u; i < Message->Topics.size(); ++i) { + for (size_t i = 0; i < Message->Topics.size(); ++i) { Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{}; auto& reqTopic = Message->Topics[i]; Response->Topics[i].Name = reqTopic.Name.value_or(""); @@ -42,12 +43,12 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { } TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) { - KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << UserToken->GetUserSID() << "'"); + KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << Context->UserToken->GetUserSID() << "'"); TGetPartitionsLocationRequest locationRequest{}; locationRequest.Topic = topicRequest.Name.value(); - locationRequest.Token = UserToken->GetSerializedToken(); - locationRequest.Database = "/Root/test"; // TODO + locationRequest.Token = Context->UserToken->GetSerializedToken(); + locationRequest.Database = Context->Database; PendingResponses++; @@ -62,7 +63,7 @@ void TKafkaMetadataActor::AddTopicError( void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) { topic.ErrorCode = NONE_ERROR; - topic.TopicId = response->BalancerTabletId; + topic.TopicId = TKafkaUuid(response->SchemeShardId, response->PathId); topic.Partitions.reserve(response->Partitions.size()); for (const auto& part : response->Partitions) { TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition; @@ -77,7 +78,7 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo auto broker = TMetadataResponseData::TMetadataResponseBroker{}; broker.NodeId = part.NodeId; broker.Host = part.Hostname; - broker.Port = Config.GetListeningPort(); + broker.Port = Context->Config.GetListeningPort(); Response->Brokers.emplace_back(std::move(broker)); } topic.Partitions.emplace_back(std::move(responsePartition)); @@ -133,7 +134,7 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) { if (PendingResponses == 0) { - Send(Parent, new TEvKafka::TEvResponse(CorrelationId, Response)); + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response)); Die(ctx); } } diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h index e77617571d7..d55659d0ace 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h @@ -1,4 +1,5 @@ -#include "../kafka_events.h" +#include "actors.h" + #include <library/cpp/actors/core/actor_bootstrapped.h> #include <ydb/library/aclib/aclib.h> #include <ydb/services/persqueue_v1/actors/events.h> @@ -7,12 +8,10 @@ namespace NKafka { class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> { public: - TKafkaMetadataActor(const TActorId& parent, const NACLib::TUserToken* userToken, const ui64 correlationId, const TMetadataRequestData* message, const NKikimrConfig::TKafkaProxyConfig& config) - : Parent(parent) - , UserToken(userToken) + TKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMetadataRequestData* message) + : Context(context) , CorrelationId(correlationId) , Message(message) - , Config(config) , Response(new TMetadataResponseData()) {} @@ -22,11 +21,12 @@ private: using TEvLocationResponse = NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse; TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest); - void HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx); + void HandleResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx); void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response); void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode); - void RespondIfRequired(const TActorContext& ctx); + void RespondIfRequired(const NActors::TActorContext& ctx); + STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { HFunc(TEvLocationResponse, HandleResponse); @@ -36,11 +36,9 @@ private: TString LogPrefix() const; private: - const TActorId Parent; - const NACLib::TUserToken* UserToken; + const TContext::TPtr Context; const ui64 CorrelationId; const TMetadataRequestData* Message; - const NKikimrConfig::TKafkaProxyConfig& Config; ui64 PendingResponses = 0; diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index a039cecb6e5..7517277ae43 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -15,8 +15,8 @@ static constexpr TDuration TOPIC_UNATHORIZED_EXPIRATION_INTERVAL = TDuration::Mi static constexpr TDuration REQUEST_EXPIRATION_INTERVAL = TDuration::Seconds(30); static constexpr TDuration WRITER_EXPIRATION_INTERVAL = TDuration::Minutes(5); -NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const NACLib::TUserToken* userToken, const TString& clientDC) { - return new TKafkaProduceActor(parent, userToken, clientDC); +NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context) { + return new TKafkaProduceActor(context); } TString TKafkaProduceActor::LogPrefix() { @@ -121,7 +121,7 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu auto& topic = Topics[topicPath]; - if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *UserToken)) { + if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *Context->UserToken)) { topic.Status = OK; topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL; for(auto& p : info.PQGroupInfo->Description.GetPartitions()) { @@ -479,7 +479,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { } } - Send(Client, new TEvKafka::TEvResponse(correlationId, response)); + Send(Context->ConnectionId, new TEvKafka::TEvResponse(correlationId, response)); if (!pendingRequest.WaitAcceptingCookies.empty()) { if (!expired) { diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index 0849cfa3bd7..e6ee6990009 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -2,11 +2,11 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/kafka_proxy/kafka_events.h> #include <ydb/core/persqueue/writer/writer.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> -#include <ydb/library/aclib/aclib.h> -#include "../kafka_events.h" +#include "actors.h" namespace NKafka { @@ -37,10 +37,8 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor> }; public: - TKafkaProduceActor(const TActorId& client, const NACLib::TUserToken* userToken, const TString& clientDC) - : Client(client) - , UserToken(userToken) - , ClientDC(clientDC) { + TKafkaProduceActor(const TContext::TPtr context) + : Context(context) { } void Bootstrap(const NActors::TActorContext& ctx); @@ -135,8 +133,7 @@ private: void LogEvent(IEventHandle& ev); private: - const TActorId Client; - const NACLib::TUserToken* UserToken; + const TContext::TPtr Context; TString SourceId; TString ClientDC; diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp index cb090b736cd..a3cbc7fb685 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -1,22 +1,23 @@ #include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> #include <ydb/core/base/ticket_parser.h> +#include <ydb/core/kafka_proxy/kafka_events.h> #include <library/cpp/actors/core/actor.h> #include "kafka_sasl_auth_actor.h" namespace NKafka { -NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const EAuthSteps authStep, const TString saslMechanism, const TSaslAuthenticateRequestData* message) { - return new TKafkaSaslAuthActor(parent, correlationId, address, authStep, saslMechanism, message); +NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) { + return new TKafkaSaslAuthActor(context, correlationId, address, message); } void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) { - if (AuthStep != EAuthSteps::WAIT_AUTH) { + if (Context->AuthenticationStep != EAuthSteps::WAIT_AUTH) { SendAuthFailedAndDie("Authentication failure. Request is not valid given the current SASL state.", EKafkaErrors::ILLEGAL_SASL_STATE, ctx); return; } - if (SaslMechanism != "PLAIN") { + if (Context->SaslMechanism != "PLAIN") { SendAuthFailedAndDie("Does not support the requested SASL mechanism.", EKafkaErrors::UNSUPPORTED_SASL_MECHANISM, ctx); return; } @@ -47,7 +48,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, ev->Get()->Token, Database); - Send(Parent, authResult); + Send(Context->ConnectionId, authResult); Die(ctx); } @@ -101,7 +102,7 @@ void TKafkaSaslAuthActor::SendAuthFailedAndDie(TString errorMessage, EKafkaError auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, nullptr, "", errorMessage); - Send(Parent, authResult); + Send(Context->ConnectionId, authResult); Die(ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h index 6861a7ccc7d..f7141c82ee9 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h @@ -5,7 +5,6 @@ #include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> #include <library/cpp/actors/core/actor_bootstrapped.h> -#include "../kafka_events.h" #include "actors.h" namespace NKafka { @@ -21,12 +20,12 @@ struct TEvPrivate { static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); - struct TEvTokenReady : TEventLocal<TEvTokenReady, EvTokenReady> { + struct TEvTokenReady : NActors::TEventLocal<TEvTokenReady, EvTokenReady> { Ydb::Auth::LoginResult LoginResult; TString Database; }; - struct TEvAuthFailed : TEventLocal<TEvAuthFailed, EvAuthFailed> { + struct TEvAuthFailed : NActors::TEventLocal<TEvAuthFailed, EvAuthFailed> { TString ErrorMessage; }; }; @@ -38,13 +37,11 @@ struct TAuthData { }; public: - TKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, EAuthSteps authStep, TString saslMechanism, const TSaslAuthenticateRequestData* message) - : Parent(parent) + TKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) + : Context(context) , CorrelationId(correlationId) , AuthenticateRequestData(message) - , Address(address) - , AuthStep(authStep) - , SaslMechanism(saslMechanism) { + , Address(address) { } void Bootstrap(const NActors::TActorContext& ctx); @@ -68,12 +65,12 @@ private: bool TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx); private: - const TActorId Parent; + const TContext::TPtr Context; const ui64 CorrelationId; + const TSaslAuthenticateRequestData* AuthenticateRequestData; const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address; - const EAuthSteps AuthStep; - const TString SaslMechanism; + TString Database; }; diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp index 170df467f3c..c768987ae13 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp @@ -2,13 +2,14 @@ #include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> #include <ydb/core/base/ticket_parser.h> #include <library/cpp/actors/core/actor.h> +#include <ydb/core/kafka_proxy/kafka_events.h> #include "kafka_sasl_handshake_actor.h" namespace NKafka { -NActors::IActor* CreateKafkaSaslHandshakeActor(const TActorId parent, const ui64 correlationId, const EAuthSteps authStep, const TSaslHandshakeRequestData* message) { - return new TKafkaSaslHandshakeActor(parent, correlationId, authStep, message); +NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TSaslHandshakeRequestData* message) { + return new TKafkaSaslHandshakeActor(context, correlationId, message); } void TKafkaSaslHandshakeActor::Bootstrap(const NActors::TActorContext& ctx) { @@ -17,7 +18,7 @@ void TKafkaSaslHandshakeActor::Bootstrap(const NActors::TActorContext& ctx) { } void TKafkaSaslHandshakeActor::Handshake() { - if (AuthStep != EAuthSteps::WAIT_HANDSHAKE) { + if (Context->AuthenticationStep != EAuthSteps::WAIT_HANDSHAKE) { SendResponse("Authentication failure. Request is not valid given the current SASL state.", EKafkaErrors::ILLEGAL_SASL_STATE, EAuthSteps::FAILED); return; } @@ -28,14 +29,14 @@ void TKafkaSaslHandshakeActor::Handshake() { SendResponse("", EKafkaErrors::NONE_ERROR, EAuthSteps::WAIT_AUTH, TStringBuilder() << HandshakeRequestData->Mechanism); } -void TKafkaSaslHandshakeActor::SendResponse(TString errorMessage, EKafkaErrors kafkaError, EAuthSteps authStep, TString saslMechanism) { +void TKafkaSaslHandshakeActor::SendResponse(const TString& errorMessage, EKafkaErrors kafkaError, EAuthSteps authStep, const TString& saslMechanism) { auto responseToClient = std::make_shared<TSaslHandshakeResponseData>(); responseToClient->ErrorCode = kafkaError; responseToClient->Mechanisms.insert(responseToClient->Mechanisms.end(), SUPPORTED_SASL_MECHANISMS.begin(), SUPPORTED_SASL_MECHANISMS.end()); auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); auto handshakeResult = new TEvKafka::TEvHandshakeResult(authStep, evResponse, saslMechanism, errorMessage); - Send(Parent, handshakeResult); + Send(Context->ConnectionId, handshakeResult); } } // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.h index c516d4bef67..f736c07c7c3 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.h @@ -5,7 +5,7 @@ #include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> #include <library/cpp/actors/core/actor_bootstrapped.h> -#include "../kafka_events.h" +#include "actors.h" namespace NKafka { @@ -16,24 +16,22 @@ const TVector<TString> SUPPORTED_SASL_MECHANISMS = { }; public: - TKafkaSaslHandshakeActor(const TActorId parent, const ui64 correlationId, const EAuthSteps authStep, const TSaslHandshakeRequestData* message) - : Parent(parent) + TKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TSaslHandshakeRequestData* message) + : Context(context) , CorrelationId(correlationId) - , HandshakeRequestData(message) - , AuthStep(authStep) { + , HandshakeRequestData(message) { } void Bootstrap(const NActors::TActorContext& ctx); private: void Handshake(); - void SendResponse(TString errorMessage, EKafkaErrors kafkaError, EAuthSteps authStep, TString saslMechanism = ""); + void SendResponse(const TString& errorMessage, EKafkaErrors kafkaError, EAuthSteps authStep, const TString& saslMechanism = ""); private: - const TActorId Parent; + const TContext::TPtr Context; const ui64 CorrelationId; const TSaslHandshakeRequestData* HandshakeRequestData; - EAuthSteps AuthStep; }; } // NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index e5aeb2f5bac..2ff392fa41b 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -4,7 +4,6 @@ #include "kafka_connection.h" #include "kafka_events.h" -#include "kafka_messages.h" #include "kafka_log_impl.h" #include "actors/actors.h" @@ -18,14 +17,6 @@ using namespace NActors; using namespace NKikimr; -NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message); -NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message); -NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const NACLib::TUserToken* userToken, const ui64 correlationId, const TMetadataRequestData* message, const NKikimrConfig::TKafkaProxyConfig& config); -NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const NACLib::TUserToken* userToken, const TString& clientDC); -NActors::IActor* CreateKafkaSaslHandshakeActor(const TActorId parent, const ui64 correlationId, const EAuthSteps authStep, const TSaslHandshakeRequestData* message); -NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const EAuthSteps authStep, const TString saslMechanism, const TSaslAuthenticateRequestData* message); - - char Hex(const unsigned char c) { return c < 10 ? '0' + c : 'A' + c - 10; } @@ -65,7 +56,6 @@ public: TIntrusivePtr<TSocketDescriptor> Socket; TSocketAddressType Address; - const NKikimrConfig::TKafkaProxyConfig& Config; THPTimer InactivityTimer; @@ -76,7 +66,6 @@ public: bool CloseConnection = false; NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; - TString ClientDC; std::shared_ptr<Msg> Request; std::unordered_map<ui64, Msg::TPtr> PendingRequests; @@ -91,26 +80,23 @@ public: TActorId ProduceActorId; - TIntrusiveConstPtr<NACLib::TUserToken> UserToken; - EAuthSteps AuthStep; - TString Database; - TString SaslMechanism; - + TContext::TPtr Context; TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const NKikimrConfig::TKafkaProxyConfig& config) : Socket(std::move(socket)) , Address(address) - , Config(config) , Step(SIZE_READ) , Demand(NoDemand) , InflightSize(0) - , AuthStep(EAuthSteps::WAIT_HANDSHAKE) { + , Context(std::make_shared<TContext>(config)) { SetNonBlock(); IsSslSupported = IsSslSupported && Socket->IsSslSupported(); } void Bootstrap() { + Context->ConnectionId = SelfId(); + Become(&TKafkaConnection::StateAccepting); Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false)); KAFKA_LOG_I("incoming connection opened " << Address); @@ -206,35 +192,31 @@ protected: } void HandleMessage(TRequestHeaderData* header, const TApiVersionsRequestData* message) { - Register(CreateKafkaApiVersionsActor(SelfId(), header->CorrelationId, message)); + Register(CreateKafkaApiVersionsActor(Context, header->CorrelationId, message)); } void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message, const TActorContext& ctx) { - if (!UserToken) { - KAFKA_LOG_ERROR("Unauthenificated produce"); - PassAway(); - } if (!ProduceActorId) { - ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(SelfId(), UserToken.Get(), ClientDC)); + ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(Context)); } Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message)); } void HandleMessage(const TRequestHeaderData* header, const TInitProducerIdRequestData* message) { - Register(CreateKafkaInitProducerIdActor(SelfId(), header->CorrelationId, message)); + Register(CreateKafkaInitProducerIdActor(Context, header->CorrelationId, message)); } void HandleMessage(TRequestHeaderData* header, const TMetadataRequestData* message) { - Register(CreateKafkaMetadataActor(SelfId(), UserToken.Get(), header->CorrelationId, message, Config)); + Register(CreateKafkaMetadataActor(Context, header->CorrelationId, message)); } void HandleMessage(const TRequestHeaderData* header, const TSaslAuthenticateRequestData* message) { - Register(CreateKafkaSaslAuthActor(SelfId(), header->CorrelationId, Address, AuthStep, SaslMechanism, message)); + Register(CreateKafkaSaslAuthActor(Context, header->CorrelationId, Address, message)); } void HandleMessage(const TRequestHeaderData* header, const TSaslHandshakeRequestData* message) { - Register(CreateKafkaSaslHandshakeActor(SelfId(), header->CorrelationId, AuthStep, message)); + Register(CreateKafkaSaslHandshakeActor(Context, header->CorrelationId, message)); } void ProcessRequest(const TActorContext& ctx) { @@ -242,6 +224,12 @@ protected: << ", Size=" << Request->Size); Msg::TPtr r = Request; + + if (!Context->Authenticated() && RequireAuthentication(static_cast<EApiKey>(Request->Header.RequestApiKey))) { + KAFKA_LOG_ERROR("unauthenticated request: ApiKey=" << Request->Header.RequestApiKey); + return PassAway(); + } + PendingRequestsQueue.push_back(r); PendingRequests[r->Header.CorrelationId] = r; @@ -293,10 +281,12 @@ protected: PassAway(); return; } - UserToken = event->UserToken; - Database = event->Database; - AuthStep = authStep; - KAFKA_LOG_D("Authentificated successful. SID=" << UserToken->GetUserSID()); + + Context->UserToken = event->UserToken; + Context->Database = event->Database; + Context->AuthenticationStep = authStep; + + KAFKA_LOG_D("Authentificated successful. SID=" << Context->UserToken->GetUserSID()); } void Handle(TEvKafka::TEvHandshakeResult::TPtr ev, const TActorContext& ctx) { @@ -309,8 +299,9 @@ protected: PassAway(); return; } - SaslMechanism = event->SaslMechanism; - AuthStep = authStep; + + Context->SaslMechanism = event->SaslMechanism; + Context->AuthenticationStep = authStep; } void Reply(const ui64 correlationId, TApiMessage::TPtr response, const TActorContext& ctx) { @@ -354,7 +345,7 @@ protected: TKafkaInt32 size = responseHeader.Size(headerVersion) + reply->Size(version); - TBufferedWriter buffer(Socket.Get(), Config.GetPacketSize()); + TBufferedWriter buffer(Socket.Get(), Context->Config.GetPacketSize()); TKafkaWritable writable(buffer); writable << size; @@ -400,14 +391,19 @@ protected: case SIZE_PREPARE: NormalizeNumber(Request->ExpectedSize); - if ((ui64)Request->ExpectedSize > Config.GetMaxMessageSize()) { + if ((ui64)Request->ExpectedSize > Context->Config.GetMaxMessageSize()) { KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize); return PassAway(); } Step = INFLIGTH_CHECK; case INFLIGTH_CHECK: - if (InflightSize + Request->ExpectedSize > Config.GetMaxInflightSize()) { + if (!Context->Authenticated() && !PendingRequestsQueue.empty()) { + // Allow only one message to be processed at a time for non-authenticated users + return; + } + if (InflightSize + Request->ExpectedSize > Context->Config.GetMaxInflightSize()) { + // We limit the size of processed messages so as not to exceed the size of available memory return; } InflightSize += Request->ExpectedSize; diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp index c1d76117796..0f54284e8c9 100644 --- a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp +++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp @@ -19,11 +19,15 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) { auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) { NKikimrConfig::TKafkaProxyConfig Config; - NACLib::TUserToken userToken("root@builtin", {}); auto* runtime = server.CleverServer->GetRuntime(); auto request = GetMetadataRequest(topics); - auto actorId = runtime->Register(new TKafkaMetadataActor(edgeActor, &userToken, 1, request.Get(), Config)); + + auto context = std::make_shared<TContext>(Config); + context->ConnectionId = edgeActor; + context->UserToken = new NACLib::TUserToken("root@builtin", {}); + + auto actorId = runtime->Register(new TKafkaMetadataActor(context, 1, request.Get())); runtime->EnableScheduleForActor(actorId); runtime->DispatchEvents(); Cerr << "Wait for response for topics: '"; diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index 5f01f97f944..86a5542edc3 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -526,7 +526,11 @@ namespace NKikimr::NGRpcProxy::V1 { if (this->ReplyIfNotTopic(ev)) { return false; } - PQGroupInfo = ev->Get()->Request->ResultSet[0].PQGroupInfo; + + auto& item = ev->Get()->Request->ResultSet[0]; + PQGroupInfo = item.PQGroupInfo; + Self = item.Self; + return true; } @@ -559,6 +563,7 @@ namespace NKikimr::NGRpcProxy::V1 { protected: THolder<TEvResponse> Response; TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo; + TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TDirEntryInfo> Self; }; } diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index ee3b7b7dfa6..b2953e7a7f2 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -465,7 +465,8 @@ struct TEvPQProxy { { TEvPartitionLocationResponse() {} TVector<TPartitionLocationInfo> Partitions; - ui64 BalancerTabletId; + ui64 SchemeShardId; + ui64 PathId; }; }; diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 07dbae4215f..c3ef4e28cb1 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1415,8 +1415,10 @@ void TPartitionsLocationActor::HandleCacheNavigateResponse( if (!TBase::HandleCacheNavigateResponseBase(ev)) { return; } + if (ProcessTablets(PQGroupInfo->Description, this->ActorContext())) { - Response->BalancerTabletId = BalancerTabletId; + Response->PathId = Self->Info.GetPathId(); + Response->SchemeShardId = Self->Info.GetSchemeshardId(); } } |