aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-14 13:14:56 +0300
committertesseract <tesseract@yandex-team.com>2023-08-14 14:37:42 +0300
commit9547d91f81ce434a8cf900b1562bc87c88abfb79 (patch)
tree11263be3b578270dbda4ccaca8f9faa170720bf5
parentfebf300349ed585aa250bef06bfbdb91597a08bd (diff)
downloadydb-9547d91f81ce434a8cf900b1562bc87c88abfb79.tar.gz
Improvements to PRODUCE
- Block non-authentificated requests except API_VERSION, SASL_HANDSHAKE and SASL_AUTHENTIFICATE - All unauthentificated request process sequentially - Use path id as topic id - refactoring - extract Context
-rw-r--r--ydb/core/kafka_proxy/actors/actors.h46
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp8
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h9
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp15
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h9
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp25
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.h18
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp8
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.h13
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp13
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h19
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp11
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.h14
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp70
-rw-r--r--ydb/core/kafka_proxy/ut/metarequest_ut.cpp8
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h7
-rw-r--r--ydb/services/persqueue_v1/actors/events.h3
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp4
18 files changed, 176 insertions, 124 deletions
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h
index 7b83a8f60cb..1b98abe4d9f 100644
--- a/ydb/core/kafka_proxy/actors/actors.h
+++ b/ydb/core/kafka_proxy/actors/actors.h
@@ -1,7 +1,51 @@
#pragma once
+#include <ydb/core/protos/config.pb.h>
+#include <ydb/library/aclib/aclib.h>
+
+#include "../kafka_messages.h"
+
namespace NKafka {
- enum EAuthSteps { WAIT_HANDSHAKE, WAIT_AUTH, SUCCESS, FAILED };
+enum EAuthSteps {
+ WAIT_HANDSHAKE,
+ WAIT_AUTH,
+ SUCCESS,
+ FAILED
+};
+
+struct TContext {
+ using TPtr = std::shared_ptr<TContext>;
+
+ TContext(const NKikimrConfig::TKafkaProxyConfig& config)
+ : Config(config) {
+ }
+
+ const NKikimrConfig::TKafkaProxyConfig& Config;
+
+ TActorId ConnectionId;
+
+
+ EAuthSteps AuthenticationStep = EAuthSteps::WAIT_HANDSHAKE;
+ TString SaslMechanism;
+
+ TString Database;
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+ TString ClientDC;
+
+ bool Authenticated() { return AuthenticationStep == SUCCESS; }
+};
+
+inline bool RequireAuthentication(EApiKey apiKey) {
+ return !(EApiKey::API_VERSIONS == apiKey || EApiKey::SASL_HANDSHAKE == apiKey || EApiKey::SASL_AUTHENTICATE == apiKey);
+}
+
+NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TApiVersionsRequestData* message);
+NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TInitProducerIdRequestData* message);
+NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMetadataRequestData* message);
+NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
+NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TSaslHandshakeRequestData* message);
+NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message);
+
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
index 8837bcb230a..bb24a616156 100644
--- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
@@ -1,9 +1,11 @@
#include "kafka_api_versions_actor.h"
+#include <ydb/core/kafka_proxy/kafka_events.h>
+
namespace NKafka {
-NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message) {
- return new TKafkaApiVersionsActor(parent, correlationId, message);
+NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TApiVersionsRequestData* message) {
+ return new TKafkaApiVersionsActor(context, correlationId, message);
}
TApiVersionsResponseData::TPtr GetApiVersions() {
@@ -40,7 +42,7 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
void TKafkaApiVersionsActor::Bootstrap(const NActors::TActorContext& ctx) {
Y_UNUSED(Message);
- Send(Parent, new TEvKafka::TEvResponse(CorrelationId, GetApiVersions()));
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, GetApiVersions()));
Die(ctx);
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h
index 4a00822b864..69adc294f60 100644
--- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h
@@ -1,12 +1,13 @@
-#include "../kafka_events.h"
+#include "actors.h"
+
#include <library/cpp/actors/core/actor_bootstrapped.h>
namespace NKafka {
class TKafkaApiVersionsActor: public NActors::TActorBootstrapped<TKafkaApiVersionsActor> {
public:
- TKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message)
- : Parent(parent)
+ TKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TApiVersionsRequestData* message)
+ : Context(context)
, CorrelationId(correlationId)
, Message(message) {
}
@@ -14,7 +15,7 @@ public:
void Bootstrap(const NActors::TActorContext& ctx);
private:
- const TActorId Parent;
+ const TContext::TPtr Context;
const ui64 CorrelationId;
const TApiVersionsRequestData* Message;
};
diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
index b5ec20346b3..71bd8503fca 100644
--- a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
@@ -1,16 +1,19 @@
#include "kafka_init_producer_id_actor.h"
+#include <util/random/random.h>
+#include <ydb/core/kafka_proxy/kafka_events.h>
+
namespace NKafka {
-NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message) {
- return new TKafkaInitProducerIdActor(parent, correlationId, message);
+NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TInitProducerIdRequestData* message) {
+ return new TKafkaInitProducerIdActor(context, correlationId, message);
}
-TInitProducerIdResponseData::TPtr GetResponse() {
+TInitProducerIdResponseData::TPtr GetResponse(const NActors::TActorContext& ctx) {
TInitProducerIdResponseData::TPtr response = std::make_shared<TInitProducerIdResponseData>();
- response->ProducerEpoch = 1;
- response->ProducerId = 1;
+ response->ProducerEpoch = 0;
+ response->ProducerId = ((ctx.Now().MilliSeconds() << 16) & 0x7FFFFFFFFFFF) + RandomNumber<ui16>();
response->ErrorCode = EKafkaErrors::NONE_ERROR;
response->ThrottleTimeMs = 0;
@@ -20,7 +23,7 @@ TInitProducerIdResponseData::TPtr GetResponse() {
void TKafkaInitProducerIdActor::Bootstrap(const NActors::TActorContext& ctx) {
Y_UNUSED(Message);
- Send(Parent, new TEvKafka::TEvResponse(CorrelationId, GetResponse()));
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, GetResponse(ctx)));
Die(ctx);
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h
index 78e5622d2a6..08e5f755f0a 100644
--- a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h
@@ -1,12 +1,13 @@
-#include "../kafka_events.h"
+#include "actors.h"
+
#include <library/cpp/actors/core/actor_bootstrapped.h>
namespace NKafka {
class TKafkaInitProducerIdActor: public NActors::TActorBootstrapped<TKafkaInitProducerIdActor> {
public:
- TKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message)
- : Parent(parent)
+ TKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TInitProducerIdRequestData* message)
+ : Context(context)
, CorrelationId(correlationId)
, Message(message) {
}
@@ -14,7 +15,7 @@ public:
void Bootstrap(const NActors::TActorContext& ctx);
private:
- const TActorId Parent;
+ const TContext::TPtr Context;
const ui64 CorrelationId;
const TInitProducerIdRequestData* Message;
};
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
index 9b251733ae9..0951a66e05c 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
@@ -1,21 +1,22 @@
#include "kafka_metadata_actor.h"
+
+#include <ydb/core/kafka_proxy/kafka_events.h>
#include <ydb/services/persqueue_v1/actors/schema_actors.h>
namespace NKafka {
using namespace NKikimr::NGRpcProxy::V1;
-NActors::IActor* CreateKafkaMetadataActor(const TActorId parent,
- const NACLib::TUserToken* userToken,
+NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
const ui64 correlationId,
- const TMetadataRequestData* message,
- const NKikimrConfig::TKafkaProxyConfig& config) {
- return new TKafkaMetadataActor(parent, userToken, correlationId, message, config);
+ const TMetadataRequestData* message) {
+ return new TKafkaMetadataActor(context, correlationId, message);
}
void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
Response->Topics.resize(Message->Topics.size());
+
THashMap<TString, TActorId> partitionActors;
- for (auto i = 0u; i < Message->Topics.size(); ++i) {
+ for (size_t i = 0; i < Message->Topics.size(); ++i) {
Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{};
auto& reqTopic = Message->Topics[i];
Response->Topics[i].Name = reqTopic.Name.value_or("");
@@ -42,12 +43,12 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
}
TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) {
- KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << UserToken->GetUserSID() << "'");
+ KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << Context->UserToken->GetUserSID() << "'");
TGetPartitionsLocationRequest locationRequest{};
locationRequest.Topic = topicRequest.Name.value();
- locationRequest.Token = UserToken->GetSerializedToken();
- locationRequest.Database = "/Root/test"; // TODO
+ locationRequest.Token = Context->UserToken->GetSerializedToken();
+ locationRequest.Database = Context->Database;
PendingResponses++;
@@ -62,7 +63,7 @@ void TKafkaMetadataActor::AddTopicError(
void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) {
topic.ErrorCode = NONE_ERROR;
- topic.TopicId = response->BalancerTabletId;
+ topic.TopicId = TKafkaUuid(response->SchemeShardId, response->PathId);
topic.Partitions.reserve(response->Partitions.size());
for (const auto& part : response->Partitions) {
TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition;
@@ -77,7 +78,7 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = part.NodeId;
broker.Host = part.Hostname;
- broker.Port = Config.GetListeningPort();
+ broker.Port = Context->Config.GetListeningPort();
Response->Brokers.emplace_back(std::move(broker));
}
topic.Partitions.emplace_back(std::move(responsePartition));
@@ -133,7 +134,7 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc
void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
if (PendingResponses == 0) {
- Send(Parent, new TEvKafka::TEvResponse(CorrelationId, Response));
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response));
Die(ctx);
}
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
index e77617571d7..d55659d0ace 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
@@ -1,4 +1,5 @@
-#include "../kafka_events.h"
+#include "actors.h"
+
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <ydb/library/aclib/aclib.h>
#include <ydb/services/persqueue_v1/actors/events.h>
@@ -7,12 +8,10 @@ namespace NKafka {
class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> {
public:
- TKafkaMetadataActor(const TActorId& parent, const NACLib::TUserToken* userToken, const ui64 correlationId, const TMetadataRequestData* message, const NKikimrConfig::TKafkaProxyConfig& config)
- : Parent(parent)
- , UserToken(userToken)
+ TKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMetadataRequestData* message)
+ : Context(context)
, CorrelationId(correlationId)
, Message(message)
- , Config(config)
, Response(new TMetadataResponseData())
{}
@@ -22,11 +21,12 @@ private:
using TEvLocationResponse = NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse;
TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest);
- void HandleResponse(TEvLocationResponse::TPtr ev, const TActorContext& ctx);
+ void HandleResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx);
void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response);
void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode);
- void RespondIfRequired(const TActorContext& ctx);
+ void RespondIfRequired(const NActors::TActorContext& ctx);
+
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvLocationResponse, HandleResponse);
@@ -36,11 +36,9 @@ private:
TString LogPrefix() const;
private:
- const TActorId Parent;
- const NACLib::TUserToken* UserToken;
+ const TContext::TPtr Context;
const ui64 CorrelationId;
const TMetadataRequestData* Message;
- const NKikimrConfig::TKafkaProxyConfig& Config;
ui64 PendingResponses = 0;
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index a039cecb6e5..7517277ae43 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -15,8 +15,8 @@ static constexpr TDuration TOPIC_UNATHORIZED_EXPIRATION_INTERVAL = TDuration::Mi
static constexpr TDuration REQUEST_EXPIRATION_INTERVAL = TDuration::Seconds(30);
static constexpr TDuration WRITER_EXPIRATION_INTERVAL = TDuration::Minutes(5);
-NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const NACLib::TUserToken* userToken, const TString& clientDC) {
- return new TKafkaProduceActor(parent, userToken, clientDC);
+NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context) {
+ return new TKafkaProduceActor(context);
}
TString TKafkaProduceActor::LogPrefix() {
@@ -121,7 +121,7 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu
auto& topic = Topics[topicPath];
- if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *UserToken)) {
+ if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *Context->UserToken)) {
topic.Status = OK;
topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL;
for(auto& p : info.PQGroupInfo->Description.GetPartitions()) {
@@ -479,7 +479,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
}
}
- Send(Client, new TEvKafka::TEvResponse(correlationId, response));
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(correlationId, response));
if (!pendingRequest.WaitAcceptingCookies.empty()) {
if (!expired) {
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
index 0849cfa3bd7..e6ee6990009 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
@@ -2,11 +2,11 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/kafka_proxy/kafka_events.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <ydb/library/aclib/aclib.h>
-#include "../kafka_events.h"
+#include "actors.h"
namespace NKafka {
@@ -37,10 +37,8 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
};
public:
- TKafkaProduceActor(const TActorId& client, const NACLib::TUserToken* userToken, const TString& clientDC)
- : Client(client)
- , UserToken(userToken)
- , ClientDC(clientDC) {
+ TKafkaProduceActor(const TContext::TPtr context)
+ : Context(context) {
}
void Bootstrap(const NActors::TActorContext& ctx);
@@ -135,8 +133,7 @@ private:
void LogEvent(IEventHandle& ev);
private:
- const TActorId Client;
- const NACLib::TUserToken* UserToken;
+ const TContext::TPtr Context;
TString SourceId;
TString ClientDC;
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
index cb090b736cd..a3cbc7fb685 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
@@ -1,22 +1,23 @@
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
#include <ydb/core/base/ticket_parser.h>
+#include <ydb/core/kafka_proxy/kafka_events.h>
#include <library/cpp/actors/core/actor.h>
#include "kafka_sasl_auth_actor.h"
namespace NKafka {
-NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const EAuthSteps authStep, const TString saslMechanism, const TSaslAuthenticateRequestData* message) {
- return new TKafkaSaslAuthActor(parent, correlationId, address, authStep, saslMechanism, message);
+NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) {
+ return new TKafkaSaslAuthActor(context, correlationId, address, message);
}
void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) {
- if (AuthStep != EAuthSteps::WAIT_AUTH) {
+ if (Context->AuthenticationStep != EAuthSteps::WAIT_AUTH) {
SendAuthFailedAndDie("Authentication failure. Request is not valid given the current SASL state.", EKafkaErrors::ILLEGAL_SASL_STATE, ctx);
return;
}
- if (SaslMechanism != "PLAIN") {
+ if (Context->SaslMechanism != "PLAIN") {
SendAuthFailedAndDie("Does not support the requested SASL mechanism.", EKafkaErrors::UNSUPPORTED_SASL_MECHANISM, ctx);
return;
}
@@ -47,7 +48,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes
auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, ev->Get()->Token, Database);
- Send(Parent, authResult);
+ Send(Context->ConnectionId, authResult);
Die(ctx);
}
@@ -101,7 +102,7 @@ void TKafkaSaslAuthActor::SendAuthFailedAndDie(TString errorMessage, EKafkaError
auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, nullptr, "", errorMessage);
- Send(Parent, authResult);
+ Send(Context->ConnectionId, authResult);
Die(ctx);
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
index 6861a7ccc7d..f7141c82ee9 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
@@ -5,7 +5,6 @@
#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include "../kafka_events.h"
#include "actors.h"
namespace NKafka {
@@ -21,12 +20,12 @@ struct TEvPrivate {
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
- struct TEvTokenReady : TEventLocal<TEvTokenReady, EvTokenReady> {
+ struct TEvTokenReady : NActors::TEventLocal<TEvTokenReady, EvTokenReady> {
Ydb::Auth::LoginResult LoginResult;
TString Database;
};
- struct TEvAuthFailed : TEventLocal<TEvAuthFailed, EvAuthFailed> {
+ struct TEvAuthFailed : NActors::TEventLocal<TEvAuthFailed, EvAuthFailed> {
TString ErrorMessage;
};
};
@@ -38,13 +37,11 @@ struct TAuthData {
};
public:
- TKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, EAuthSteps authStep, TString saslMechanism, const TSaslAuthenticateRequestData* message)
- : Parent(parent)
+ TKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message)
+ : Context(context)
, CorrelationId(correlationId)
, AuthenticateRequestData(message)
- , Address(address)
- , AuthStep(authStep)
- , SaslMechanism(saslMechanism) {
+ , Address(address) {
}
void Bootstrap(const NActors::TActorContext& ctx);
@@ -68,12 +65,12 @@ private:
bool TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx);
private:
- const TActorId Parent;
+ const TContext::TPtr Context;
const ui64 CorrelationId;
+
const TSaslAuthenticateRequestData* AuthenticateRequestData;
const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address;
- const EAuthSteps AuthStep;
- const TString SaslMechanism;
+
TString Database;
};
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
index 170df467f3c..c768987ae13 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
@@ -2,13 +2,14 @@
#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
#include <ydb/core/base/ticket_parser.h>
#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/kafka_proxy/kafka_events.h>
#include "kafka_sasl_handshake_actor.h"
namespace NKafka {
-NActors::IActor* CreateKafkaSaslHandshakeActor(const TActorId parent, const ui64 correlationId, const EAuthSteps authStep, const TSaslHandshakeRequestData* message) {
- return new TKafkaSaslHandshakeActor(parent, correlationId, authStep, message);
+NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TSaslHandshakeRequestData* message) {
+ return new TKafkaSaslHandshakeActor(context, correlationId, message);
}
void TKafkaSaslHandshakeActor::Bootstrap(const NActors::TActorContext& ctx) {
@@ -17,7 +18,7 @@ void TKafkaSaslHandshakeActor::Bootstrap(const NActors::TActorContext& ctx) {
}
void TKafkaSaslHandshakeActor::Handshake() {
- if (AuthStep != EAuthSteps::WAIT_HANDSHAKE) {
+ if (Context->AuthenticationStep != EAuthSteps::WAIT_HANDSHAKE) {
SendResponse("Authentication failure. Request is not valid given the current SASL state.", EKafkaErrors::ILLEGAL_SASL_STATE, EAuthSteps::FAILED);
return;
}
@@ -28,14 +29,14 @@ void TKafkaSaslHandshakeActor::Handshake() {
SendResponse("", EKafkaErrors::NONE_ERROR, EAuthSteps::WAIT_AUTH, TStringBuilder() << HandshakeRequestData->Mechanism);
}
-void TKafkaSaslHandshakeActor::SendResponse(TString errorMessage, EKafkaErrors kafkaError, EAuthSteps authStep, TString saslMechanism) {
+void TKafkaSaslHandshakeActor::SendResponse(const TString& errorMessage, EKafkaErrors kafkaError, EAuthSteps authStep, const TString& saslMechanism) {
auto responseToClient = std::make_shared<TSaslHandshakeResponseData>();
responseToClient->ErrorCode = kafkaError;
responseToClient->Mechanisms.insert(responseToClient->Mechanisms.end(), SUPPORTED_SASL_MECHANISMS.begin(), SUPPORTED_SASL_MECHANISMS.end());
auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
auto handshakeResult = new TEvKafka::TEvHandshakeResult(authStep, evResponse, saslMechanism, errorMessage);
- Send(Parent, handshakeResult);
+ Send(Context->ConnectionId, handshakeResult);
}
} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.h
index c516d4bef67..f736c07c7c3 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.h
@@ -5,7 +5,7 @@
#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include "../kafka_events.h"
+#include "actors.h"
namespace NKafka {
@@ -16,24 +16,22 @@ const TVector<TString> SUPPORTED_SASL_MECHANISMS = {
};
public:
- TKafkaSaslHandshakeActor(const TActorId parent, const ui64 correlationId, const EAuthSteps authStep, const TSaslHandshakeRequestData* message)
- : Parent(parent)
+ TKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TSaslHandshakeRequestData* message)
+ : Context(context)
, CorrelationId(correlationId)
- , HandshakeRequestData(message)
- , AuthStep(authStep) {
+ , HandshakeRequestData(message) {
}
void Bootstrap(const NActors::TActorContext& ctx);
private:
void Handshake();
- void SendResponse(TString errorMessage, EKafkaErrors kafkaError, EAuthSteps authStep, TString saslMechanism = "");
+ void SendResponse(const TString& errorMessage, EKafkaErrors kafkaError, EAuthSteps authStep, const TString& saslMechanism = "");
private:
- const TActorId Parent;
+ const TContext::TPtr Context;
const ui64 CorrelationId;
const TSaslHandshakeRequestData* HandshakeRequestData;
- EAuthSteps AuthStep;
};
} // NKafka
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index e5aeb2f5bac..2ff392fa41b 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -4,7 +4,6 @@
#include "kafka_connection.h"
#include "kafka_events.h"
-#include "kafka_messages.h"
#include "kafka_log_impl.h"
#include "actors/actors.h"
@@ -18,14 +17,6 @@ using namespace NActors;
using namespace NKikimr;
-NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message);
-NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message);
-NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const NACLib::TUserToken* userToken, const ui64 correlationId, const TMetadataRequestData* message, const NKikimrConfig::TKafkaProxyConfig& config);
-NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const NACLib::TUserToken* userToken, const TString& clientDC);
-NActors::IActor* CreateKafkaSaslHandshakeActor(const TActorId parent, const ui64 correlationId, const EAuthSteps authStep, const TSaslHandshakeRequestData* message);
-NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const EAuthSteps authStep, const TString saslMechanism, const TSaslAuthenticateRequestData* message);
-
-
char Hex(const unsigned char c) {
return c < 10 ? '0' + c : 'A' + c - 10;
}
@@ -65,7 +56,6 @@ public:
TIntrusivePtr<TSocketDescriptor> Socket;
TSocketAddressType Address;
- const NKikimrConfig::TKafkaProxyConfig& Config;
THPTimer InactivityTimer;
@@ -76,7 +66,6 @@ public:
bool CloseConnection = false;
NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier;
- TString ClientDC;
std::shared_ptr<Msg> Request;
std::unordered_map<ui64, Msg::TPtr> PendingRequests;
@@ -91,26 +80,23 @@ public:
TActorId ProduceActorId;
- TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
- EAuthSteps AuthStep;
- TString Database;
- TString SaslMechanism;
-
+ TContext::TPtr Context;
TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config)
: Socket(std::move(socket))
, Address(address)
- , Config(config)
, Step(SIZE_READ)
, Demand(NoDemand)
, InflightSize(0)
- , AuthStep(EAuthSteps::WAIT_HANDSHAKE) {
+ , Context(std::make_shared<TContext>(config)) {
SetNonBlock();
IsSslSupported = IsSslSupported && Socket->IsSslSupported();
}
void Bootstrap() {
+ Context->ConnectionId = SelfId();
+
Become(&TKafkaConnection::StateAccepting);
Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false));
KAFKA_LOG_I("incoming connection opened " << Address);
@@ -206,35 +192,31 @@ protected:
}
void HandleMessage(TRequestHeaderData* header, const TApiVersionsRequestData* message) {
- Register(CreateKafkaApiVersionsActor(SelfId(), header->CorrelationId, message));
+ Register(CreateKafkaApiVersionsActor(Context, header->CorrelationId, message));
}
void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message, const TActorContext& ctx) {
- if (!UserToken) {
- KAFKA_LOG_ERROR("Unauthenificated produce");
- PassAway();
- }
if (!ProduceActorId) {
- ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(SelfId(), UserToken.Get(), ClientDC));
+ ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(Context));
}
Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message));
}
void HandleMessage(const TRequestHeaderData* header, const TInitProducerIdRequestData* message) {
- Register(CreateKafkaInitProducerIdActor(SelfId(), header->CorrelationId, message));
+ Register(CreateKafkaInitProducerIdActor(Context, header->CorrelationId, message));
}
void HandleMessage(TRequestHeaderData* header, const TMetadataRequestData* message) {
- Register(CreateKafkaMetadataActor(SelfId(), UserToken.Get(), header->CorrelationId, message, Config));
+ Register(CreateKafkaMetadataActor(Context, header->CorrelationId, message));
}
void HandleMessage(const TRequestHeaderData* header, const TSaslAuthenticateRequestData* message) {
- Register(CreateKafkaSaslAuthActor(SelfId(), header->CorrelationId, Address, AuthStep, SaslMechanism, message));
+ Register(CreateKafkaSaslAuthActor(Context, header->CorrelationId, Address, message));
}
void HandleMessage(const TRequestHeaderData* header, const TSaslHandshakeRequestData* message) {
- Register(CreateKafkaSaslHandshakeActor(SelfId(), header->CorrelationId, AuthStep, message));
+ Register(CreateKafkaSaslHandshakeActor(Context, header->CorrelationId, message));
}
void ProcessRequest(const TActorContext& ctx) {
@@ -242,6 +224,12 @@ protected:
<< ", Size=" << Request->Size);
Msg::TPtr r = Request;
+
+ if (!Context->Authenticated() && RequireAuthentication(static_cast<EApiKey>(Request->Header.RequestApiKey))) {
+ KAFKA_LOG_ERROR("unauthenticated request: ApiKey=" << Request->Header.RequestApiKey);
+ return PassAway();
+ }
+
PendingRequestsQueue.push_back(r);
PendingRequests[r->Header.CorrelationId] = r;
@@ -293,10 +281,12 @@ protected:
PassAway();
return;
}
- UserToken = event->UserToken;
- Database = event->Database;
- AuthStep = authStep;
- KAFKA_LOG_D("Authentificated successful. SID=" << UserToken->GetUserSID());
+
+ Context->UserToken = event->UserToken;
+ Context->Database = event->Database;
+ Context->AuthenticationStep = authStep;
+
+ KAFKA_LOG_D("Authentificated successful. SID=" << Context->UserToken->GetUserSID());
}
void Handle(TEvKafka::TEvHandshakeResult::TPtr ev, const TActorContext& ctx) {
@@ -309,8 +299,9 @@ protected:
PassAway();
return;
}
- SaslMechanism = event->SaslMechanism;
- AuthStep = authStep;
+
+ Context->SaslMechanism = event->SaslMechanism;
+ Context->AuthenticationStep = authStep;
}
void Reply(const ui64 correlationId, TApiMessage::TPtr response, const TActorContext& ctx) {
@@ -354,7 +345,7 @@ protected:
TKafkaInt32 size = responseHeader.Size(headerVersion) + reply->Size(version);
- TBufferedWriter buffer(Socket.Get(), Config.GetPacketSize());
+ TBufferedWriter buffer(Socket.Get(), Context->Config.GetPacketSize());
TKafkaWritable writable(buffer);
writable << size;
@@ -400,14 +391,19 @@ protected:
case SIZE_PREPARE:
NormalizeNumber(Request->ExpectedSize);
- if ((ui64)Request->ExpectedSize > Config.GetMaxMessageSize()) {
+ if ((ui64)Request->ExpectedSize > Context->Config.GetMaxMessageSize()) {
KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize);
return PassAway();
}
Step = INFLIGTH_CHECK;
case INFLIGTH_CHECK:
- if (InflightSize + Request->ExpectedSize > Config.GetMaxInflightSize()) {
+ if (!Context->Authenticated() && !PendingRequestsQueue.empty()) {
+ // Allow only one message to be processed at a time for non-authenticated users
+ return;
+ }
+ if (InflightSize + Request->ExpectedSize > Context->Config.GetMaxInflightSize()) {
+ // We limit the size of processed messages so as not to exceed the size of available memory
return;
}
InflightSize += Request->ExpectedSize;
diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
index c1d76117796..0f54284e8c9 100644
--- a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
+++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
@@ -19,11 +19,15 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) {
auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) {
NKikimrConfig::TKafkaProxyConfig Config;
- NACLib::TUserToken userToken("root@builtin", {});
auto* runtime = server.CleverServer->GetRuntime();
auto request = GetMetadataRequest(topics);
- auto actorId = runtime->Register(new TKafkaMetadataActor(edgeActor, &userToken, 1, request.Get(), Config));
+
+ auto context = std::make_shared<TContext>(Config);
+ context->ConnectionId = edgeActor;
+ context->UserToken = new NACLib::TUserToken("root@builtin", {});
+
+ auto actorId = runtime->Register(new TKafkaMetadataActor(context, 1, request.Get()));
runtime->EnableScheduleForActor(actorId);
runtime->DispatchEvents();
Cerr << "Wait for response for topics: '";
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index 5f01f97f944..86a5542edc3 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -526,7 +526,11 @@ namespace NKikimr::NGRpcProxy::V1 {
if (this->ReplyIfNotTopic(ev)) {
return false;
}
- PQGroupInfo = ev->Get()->Request->ResultSet[0].PQGroupInfo;
+
+ auto& item = ev->Get()->Request->ResultSet[0];
+ PQGroupInfo = item.PQGroupInfo;
+ Self = item.Self;
+
return true;
}
@@ -559,6 +563,7 @@ namespace NKikimr::NGRpcProxy::V1 {
protected:
THolder<TEvResponse> Response;
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
+ TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TDirEntryInfo> Self;
};
}
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index ee3b7b7dfa6..b2953e7a7f2 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -465,7 +465,8 @@ struct TEvPQProxy {
{
TEvPartitionLocationResponse() {}
TVector<TPartitionLocationInfo> Partitions;
- ui64 BalancerTabletId;
+ ui64 SchemeShardId;
+ ui64 PathId;
};
};
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 07dbae4215f..c3ef4e28cb1 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -1415,8 +1415,10 @@ void TPartitionsLocationActor::HandleCacheNavigateResponse(
if (!TBase::HandleCacheNavigateResponseBase(ev)) {
return;
}
+
if (ProcessTablets(PQGroupInfo->Description, this->ActorContext())) {
- Response->BalancerTabletId = BalancerTabletId;
+ Response->PathId = Self->Info.GetPathId();
+ Response->SchemeShardId = Self->Info.GetSchemeshardId();
}
}