aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-10 12:52:58 +0300
committertesseract <tesseract@yandex-team.com>2023-08-10 13:48:15 +0300
commit3ab959a0df5defdbcc427cb1484dabc5a0e00321 (patch)
tree506d20b17cb460dae9a177275a2b8b34770c4f02
parent81b823859871b2a055c3b6602a537f4e6a32ace0 (diff)
downloadydb-3ab959a0df5defdbcc427cb1484dabc5a0e00321.tar.gz
Check user permissions for PRODUCE requests
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp2
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp25
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.h7
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp84
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.h18
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp74
-rw-r--r--ydb/core/kafka_proxy/ut/metarequest_ut.cpp5
7 files changed, 139 insertions, 76 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index f023fb68932..520262927b0 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2639,7 +2639,7 @@ TIcNodeCacheServiceInitializer::TIcNodeCacheServiceInitializer(const TKikimrRunC
void TIcNodeCacheServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
if (appData->FeatureFlags.GetEnableIcNodeCache()) {
setup->LocalServices.emplace_back(
- TActorId(),
+ NIcNodeCache::CreateICNodesInfoCacheServiceId(),
TActorSetupCmd(NIcNodeCache::CreateICNodesInfoCacheService(appData->Counters),
TMailboxType::HTSwap, appData->UserPoolId)
);
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
index 7ca829e4e48..9b251733ae9 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
@@ -4,8 +4,12 @@
namespace NKafka {
using namespace NKikimr::NGRpcProxy::V1;
-NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message) {
- return new TKafkaMetadataActor(parent, correlationId, message);
+NActors::IActor* CreateKafkaMetadataActor(const TActorId parent,
+ const NACLib::TUserToken* userToken,
+ const ui64 correlationId,
+ const TMetadataRequestData* message,
+ const NKikimrConfig::TKafkaProxyConfig& config) {
+ return new TKafkaMetadataActor(parent, userToken, correlationId, message, config);
}
void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
@@ -15,6 +19,8 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{};
auto& reqTopic = Message->Topics[i];
Response->Topics[i].Name = reqTopic.Name.value_or("");
+ Response->ClusterId = "ydb-cluster";
+ Response->ControllerId = 1;
if (!reqTopic.Name.value_or("")) {
AddTopicError(Response->Topics[i], EKafkaErrors::INVALID_TOPIC_EXCEPTION);
@@ -31,17 +37,21 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
TopicIndexes[child].push_back(i);
}
Become(&TKafkaMetadataActor::StateWork);
+
RespondIfRequired(ctx);
}
TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) {
+ KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << UserToken->GetUserSID() << "'");
+
TGetPartitionsLocationRequest locationRequest{};
locationRequest.Topic = topicRequest.Name.value();
- //ToDo: Get database?
- //ToDo: Authorization?
+ locationRequest.Token = UserToken->GetSerializedToken();
+ locationRequest.Database = "/Root/test"; // TODO
+
PendingResponses++;
- return Register(new TPartitionsLocationActor(locationRequest, SelfId()));
+ return Register(new TPartitionsLocationActor(locationRequest, SelfId()));
}
void TKafkaMetadataActor::AddTopicError(
@@ -58,13 +68,16 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo
TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition;
responsePartition.PartitionIndex = part.PartitionId;
responsePartition.ErrorCode = NONE_ERROR;
+ responsePartition.LeaderId = part.NodeId;
responsePartition.LeaderEpoch = part.Generation;
responsePartition.ReplicaNodes.push_back(part.NodeId);
+ responsePartition.IsrNodes.push_back(part.NodeId);
auto ins = AllClusterNodes.insert(part.NodeId);
if (ins.second) {
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = part.NodeId;
broker.Host = part.Hostname;
+ broker.Port = Config.GetListeningPort();
Response->Brokers.emplace_back(std::move(broker));
}
topic.Partitions.emplace_back(std::move(responsePartition));
@@ -104,10 +117,10 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc
return RespondIfRequired(ctx);
}
- //ToDo: Log and proceed on bad iter
for (auto index : actorIter->second) {
auto& topic = Response->Topics[index];
if (r->Status == Ydb::StatusIds::SUCCESS) {
+ KAFKA_LOG_D("Describe topic '" << topic.Name << "' location finishied successful");
AddTopicResponse(topic, r);
} else {
KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << r->Status << ", Issues=" << r->Issues.ToOneLineString());
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
index 64a951ba142..e77617571d7 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
@@ -1,15 +1,18 @@
#include "../kafka_events.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/library/aclib/aclib.h>
#include <ydb/services/persqueue_v1/actors/events.h>
namespace NKafka {
class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> {
public:
- TKafkaMetadataActor(const TActorId& parent, const ui64 correlationId, const TMetadataRequestData* message)
+ TKafkaMetadataActor(const TActorId& parent, const NACLib::TUserToken* userToken, const ui64 correlationId, const TMetadataRequestData* message, const NKikimrConfig::TKafkaProxyConfig& config)
: Parent(parent)
+ , UserToken(userToken)
, CorrelationId(correlationId)
, Message(message)
+ , Config(config)
, Response(new TMetadataResponseData())
{}
@@ -34,8 +37,10 @@ private:
private:
const TActorId Parent;
+ const NACLib::TUserToken* UserToken;
const ui64 CorrelationId;
const TMetadataRequestData* Message;
+ const NKikimrConfig::TKafkaProxyConfig& Config;
ui64 PendingResponses = 0;
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index dae7662af62..a039cecb6e5 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -9,12 +9,14 @@
namespace NKafka {
static constexpr TDuration WAKEUP_INTERVAL = TDuration::Seconds(1);
+static constexpr TDuration TOPIC_OK_EXPIRATION_INTERVAL = TDuration::Minutes(15);
static constexpr TDuration TOPIC_NOT_FOUND_EXPIRATION_INTERVAL = TDuration::Seconds(15);
+static constexpr TDuration TOPIC_UNATHORIZED_EXPIRATION_INTERVAL = TDuration::Minutes(1);
static constexpr TDuration REQUEST_EXPIRATION_INTERVAL = TDuration::Seconds(30);
static constexpr TDuration WRITER_EXPIRATION_INTERVAL = TDuration::Minutes(5);
-NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC) {
- return new TKafkaProduceActor(parent, clientDC);;
+NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const NACLib::TUserToken* userToken, const TString& clientDC) {
+ return new TKafkaProduceActor(parent, userToken, clientDC);
}
TString TKafkaProduceActor::LogPrefix() {
@@ -71,11 +73,11 @@ void TKafkaProduceActor::PassAway() {
}
void TKafkaProduceActor::CleanTopics(const TActorContext& ctx) {
- const auto expired = ctx.Now() - TOPIC_NOT_FOUND_EXPIRATION_INTERVAL;
+ const auto now = ctx.Now();
std::map<TString, TTopicInfo> newTopics;
for(auto& [topicPath, topicInfo] : Topics) {
- if (!topicInfo.NotFound || topicInfo.NotFoundTime > expired) {
+ if (topicInfo.ExpirationTime > now) {
newTopics[topicPath] = std::move(topicInfo);
}
}
@@ -109,6 +111,7 @@ void TKafkaProduceActor::EnqueueRequest(TEvKafka::TEvProduceRequest::TPtr reques
}
void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ auto now = ctx.Now();
auto* navigate = ev.Get()->Get()->Request.Get();
for (auto& info : navigate->ResultSet) {
if (NSchemeCache::TSchemeCacheNavigate::EStatus::Ok == info.Status) {
@@ -117,10 +120,20 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu
TopicsForInitialization.erase(topicPath);
auto& topic = Topics[topicPath];
- for(auto& p : info.PQGroupInfo->Description.GetPartitions()) {
- topic.partitions[p.GetPartitionId()] = p.GetTabletId();
+
+ if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *UserToken)) {
+ topic.Status = OK;
+ topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL;
+ for(auto& p : info.PQGroupInfo->Description.GetPartitions()) {
+ topic.partitions[p.GetPartitionId()] = p.GetTabletId();
+ }
+ } else {
+ KAFKA_LOG_W("Unauthorized PRODUCE to topic '" << topicPath << "'");
+ topic.Status = UNAUTHORIZED;
+ topic.ExpirationTime = now + TOPIC_UNATHORIZED_EXPIRATION_INTERVAL;
}
+
auto pathId = info.TableId.PathId;
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(pathId));
}
@@ -129,8 +142,8 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu
for(auto& topicPath : TopicsForInitialization) {
KAFKA_LOG_D("Topic '" << topicPath << "' not found");
auto& topicInfo = Topics[topicPath];
- topicInfo.NotFound = true;
- topicInfo.NotFoundTime = ctx.Now();
+ topicInfo.Status = NOT_FOUND;
+ topicInfo.ExpirationTime = now + TOPIC_NOT_FOUND_EXPIRATION_INTERVAL;
}
TopicsForInitialization.clear();
@@ -155,20 +168,24 @@ void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TP
}
auto& topicInfo = Topics[path];
- topicInfo.NotFound = true;
- topicInfo.NotFoundTime = ctx.Now();
+ topicInfo.Status = NOT_FOUND;
+ topicInfo.ExpirationTime = ctx.Now() + TOPIC_NOT_FOUND_EXPIRATION_INTERVAL;
topicInfo.partitions.clear();
}
-void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& /*ctx*/) {
+void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) {
auto* e = ev->Get();
auto& path = e->Path;
KAFKA_LOG_I("Topic '" << path << "' was updated");
auto& topic = Topics[path];
+ if (topic.Status == UNAUTHORIZED) {
+ return;
+ }
+ topic.Status = OK;
+ topic.ExpirationTime = ctx.Now() + TOPIC_OK_EXPIRATION_INTERVAL;
topic.partitions.clear();
for (auto& p : e->Result->GetPathDescription().GetPersQueueGroup().GetPartitions()) {
- topic.NotFound = false;
topic.partitions[p.GetPartitionId()] = p.GetTabletId();
}
}
@@ -300,8 +317,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T
for(const auto& partitionData : topicData.PartitionData) {
const auto partitionId = partitionData.Index;
- auto writerId = PartitionWriter(topicPath, partitionId, ctx);
- if (writerId) {
+ auto writer = PartitionWriter(topicPath, partitionId, ctx);
+ if (OK == writer.first) {
auto ownCookie = ++Cookie;
auto& cookieInfo = Cookies[ownCookie];
cookieInfo.TopicPath = topicPath;
@@ -314,10 +331,17 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC);
- Send(writerId, std::move(ev));
+ Send(writer.second, std::move(ev));
} else {
auto& result = pendingRequest.Results[position];
- result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
+ switch (writer.first) {
+ case NOT_FOUND:
+ result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
+ case UNAUTHORIZED:
+ result.ErrorCode = EKafkaErrors::TOPIC_AUTHORIZATION_FAILED;
+ default:
+ result.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR;
+ }
}
++position;
@@ -497,30 +521,30 @@ void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx)
ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.release()));
}
-TActorId TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx) {
- auto& partitionWriters = Writers[topicPath];
- auto itp = partitionWriters.find(partitionId);
- if (itp != partitionWriters.end()) {
- auto& writerInfo = itp->second;
- writerInfo.LastAccessed = ctx.Now();
- return writerInfo.ActorId;
- }
-
+std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx) {
auto it = Topics.find(topicPath);
if (it == Topics.end()) {
KAFKA_LOG_ERROR("Internal error: topic '" << topicPath << "' isn`t initialized");
- return TActorId{};
+ return { NOT_FOUND, TActorId{} };
}
auto& topicInfo = it->second;
- if (topicInfo.NotFound) {
- return TActorId{};
+ if (topicInfo.Status != OK) {
+ return { topicInfo.Status, TActorId{} };
+ }
+
+ auto& partitionWriters = Writers[topicPath];
+ auto itp = partitionWriters.find(partitionId);
+ if (itp != partitionWriters.end()) {
+ auto& writerInfo = itp->second;
+ writerInfo.LastAccessed = ctx.Now();
+ return { OK, writerInfo.ActorId };
}
auto& partitions = topicInfo.partitions;
auto pit = partitions.find(partitionId);
if (pit == partitions.end()) {
- return TActorId{};
+ return { NOT_FOUND, TActorId{} };
}
auto tabletId = pit->second;
@@ -529,7 +553,7 @@ TActorId TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 part
auto& writerInfo = partitionWriters[partitionId];
writerInfo.ActorId = ctx.RegisterWithSameMailbox(writerActor);
writerInfo.LastAccessed = ctx.Now();
- return writerInfo.ActorId;
+ return { OK, writerInfo.ActorId };
}
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
index 43c7a170560..0849cfa3bd7 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
@@ -4,6 +4,7 @@
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/library/aclib/aclib.h>
#include "../kafka_events.h"
@@ -28,9 +29,17 @@ using namespace NKikimrClient;
//
class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor> {
struct TPendingRequest;
+
+ enum ETopicStatus {
+ OK,
+ NOT_FOUND,
+ UNAUTHORIZED
+ };
+
public:
- TKafkaProduceActor(const TActorId& client, const TString& clientDC)
+ TKafkaProduceActor(const TActorId& client, const NACLib::TUserToken* userToken, const TString& clientDC)
: Client(client)
+ , UserToken(userToken)
, ClientDC(clientDC) {
}
@@ -120,13 +129,14 @@ private:
void CleanTopics(const TActorContext& ctx);
void CleanWriters(const TActorContext& ctx);
- TActorId PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx);
+ std::pair<ETopicStatus, TActorId> PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx);
TString LogPrefix();
void LogEvent(IEventHandle& ev);
private:
const TActorId Client;
+ const NACLib::TUserToken* UserToken;
TString SourceId;
TString ClientDC;
@@ -167,8 +177,8 @@ private:
std::set<TString> TopicsForInitialization;
struct TTopicInfo {
- bool NotFound = false;
- TInstant NotFoundTime;
+ ETopicStatus Status = OK;
+ TInstant ExpirationTime;
// partitioId -> tabletId
std::unordered_map<ui32, ui64> partitions;
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index efa5bd2cb18..34a1647283f 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -19,8 +19,8 @@ using namespace NKikimr;
NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message);
NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message);
-NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message);
-NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC);
+NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const NACLib::TUserToken* userToken, const ui64 correlationId, const TMetadataRequestData* message, const NKikimrConfig::TKafkaProxyConfig& config);
+NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const NACLib::TUserToken* userToken, const TString& clientDC);
NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const TSaslHandshakeRequestData* message);
NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message);
@@ -77,7 +77,6 @@ public:
NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier;
TString ClientDC;
- i32 CorrelationId = 1;
std::shared_ptr<Msg> Request;
std::unordered_map<ui64, Msg::TPtr> PendingRequests;
std::deque<Msg::TPtr> PendingRequestsQueue;
@@ -106,13 +105,11 @@ public:
IsSslSupported = IsSslSupported && Socket->IsSslSupported();
}
- void Bootstrap(const TActorContext& ctx) {
+ void Bootstrap() {
Become(&TKafkaConnection::StateAccepting);
Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false));
KAFKA_LOG_I("incoming connection opened " << Address);
- ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(SelfId(), ClientDC));
-
OnAccept();
}
@@ -131,7 +128,7 @@ public:
protected:
void LogEvent(IEventHandle& ev) {
- KAFKA_LOG_T("Event: " << ev.GetTypeName());
+ KAFKA_LOG_T("Received event: " << ev.GetTypeName());
}
void SetNonBlock() noexcept {
@@ -197,7 +194,7 @@ protected:
switch (ev->GetTypeRewrite()) {
hFunc(TEvPollerReady, HandleAccepting);
hFunc(TEvPollerRegisterResult, HandleAccepting);
- hFunc(TEvKafka::TEvResponse, Handle);
+ HFunc(TEvKafka::TEvResponse, Handle);
default:
KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName());
}
@@ -207,7 +204,15 @@ protected:
Register(CreateKafkaApiVersionsActor(SelfId(), header->CorrelationId, message));
}
- void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message) {
+ void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message, const TActorContext& ctx) {
+ if (!UserToken) {
+ KAFKA_LOG_ERROR("Unauthenificated produce");
+ PassAway();
+ }
+ if (!ProduceActorId) {
+ ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(SelfId(), UserToken.Get(), ClientDC));
+ }
+
Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message));
}
@@ -216,7 +221,7 @@ protected:
}
void HandleMessage(TRequestHeaderData* header, const TMetadataRequestData* message) {
- Register(CreateKafkaMetadataActor(SelfId(), header->CorrelationId, message));
+ Register(CreateKafkaMetadataActor(SelfId(), UserToken.Get(), header->CorrelationId, message, Config));
}
void HandleMessage(const TRequestHeaderData* header, const TSaslAuthenticateRequestData* message) {
@@ -227,7 +232,7 @@ protected:
Register(CreateKafkaSaslAuthActor(SelfId(), header->CorrelationId, message));
}
- void ProcessRequest() {
+ void ProcessRequest(const TActorContext& ctx) {
KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize
<< ", Size=" << Request->Size);
@@ -239,7 +244,7 @@ protected:
switch (Request->Header.RequestApiKey) {
case PRODUCE:
- HandleMessage(&Request->Header, dynamic_cast<TProduceRequestData*>(message));
+ HandleMessage(&Request->Header, dynamic_cast<TProduceRequestData*>(message), ctx);
return;
case API_VERSIONS:
@@ -268,16 +273,17 @@ protected:
}
}
- void Handle(TEvKafka::TEvResponse::TPtr response) {
+ void Handle(TEvKafka::TEvResponse::TPtr response, const TActorContext& ctx) {
auto r = response->Get();
- Reply(r->CorrelationId, r->Response);
+ Reply(r->CorrelationId, r->Response, ctx);
}
void Handle(TEvKafka::TEvAuthSuccess::TPtr auth) {
UserToken = auth->Get()->UserToken;
- }
+ KAFKA_LOG_D("Authentificated successful. SID=" << UserToken->GetUserSID());
+ }
- void Reply(const ui64 correlationId, TApiMessage::TPtr response) {
+ void Reply(const ui64 correlationId, TApiMessage::TPtr response, const TActorContext& ctx) {
auto it = PendingRequests.find(correlationId);
if (it == PendingRequests.end()) {
KAFKA_LOG_ERROR("Unexpected correlationId " << correlationId);
@@ -290,7 +296,7 @@ protected:
ProcessReplyQueue();
- DoRead();
+ DoRead(ctx);
}
void ProcessReplyQueue() {
@@ -330,7 +336,7 @@ protected:
KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size);
}
- void DoRead() {
+ void DoRead(const TActorContext& ctx) {
KAFKA_LOG_T("DoRead: Demand=" << Demand.Length << ", Step=" << static_cast<i32>(Step));
for (;;) {
@@ -389,11 +395,18 @@ protected:
case MESSAGE_PROCESS:
TKafkaInt16 apiKey = *(TKafkaInt16*)Request->Buffer.Data();
TKafkaVersion apiVersion = *(TKafkaVersion*)(Request->Buffer.Data() + sizeof(TKafkaInt16));
+ TKafkaInt32 correlationId = *(TKafkaInt32*)(Request->Buffer.Data() + sizeof(TKafkaInt16) + sizeof(TKafkaInt16));
NormalizeNumber(apiKey);
NormalizeNumber(apiVersion);
+ NormalizeNumber(correlationId);
- KAFKA_LOG_D("received message. ApiKey=" << apiKey << ", Version=" << apiVersion);
+ KAFKA_LOG_D("received message. ApiKey=" << apiKey << ", Version=" << apiVersion << ", CorrelationId=" << correlationId);
+
+ if (PendingRequests.contains(correlationId)) {
+ KAFKA_LOG_ERROR("CorrelationId " << correlationId << " already processing");
+ return PassAway();
+ }
// Print("received", Request->Buffer, Request->ExpectedSize);
@@ -402,23 +415,18 @@ protected:
Request->Message = CreateRequest(apiKey);
try {
Request->Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion));
- if (Request->Header.CorrelationId != CorrelationId) {
- KAFKA_LOG_ERROR("Unexpected correlationId. Expected=" << CorrelationId << ", Received=" << Request->Header.CorrelationId);
- return PassAway();
- }
Request->Message->Read(readable, apiVersion);
-
- ++CorrelationId;
} catch(const yexception& e) {
- KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request->Header.RequestApiKey
- << ", Version=" << Request->Header.RequestApiVersion
- << ", Error=" << e.what());
+ KAFKA_LOG_ERROR("error on processing message: ApiKey=" << apiKey
+ << ", Version=" << apiVersion
+ << ", CorrelationId=" << correlationId
+ << ", Error=" << e.what());
return PassAway();
}
Step = SIZE_READ;
- ProcessRequest();
+ ProcessRequest(ctx);
break;
}
@@ -426,9 +434,9 @@ protected:
}
}
- void HandleConnected(TEvPollerReady::TPtr event) {
+ void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) {
if (event->Get()->Read) {
- DoRead();
+ DoRead(ctx);
if (event->Get() == InactivityEvent) {
const TDuration passed = TDuration::Seconds(std::abs(InactivityTimer.Passed()));
@@ -460,9 +468,9 @@ protected:
STATEFN(StateConnected) {
LogEvent(*ev.Get());
switch (ev->GetTypeRewrite()) {
- hFunc(TEvPollerReady, HandleConnected);
+ HFunc(TEvPollerReady, HandleConnected);
hFunc(TEvPollerRegisterResult, HandleConnected);
- hFunc(TEvKafka::TEvResponse, Handle);
+ HFunc(TEvKafka::TEvResponse, Handle);
hFunc(TEvKafka::TEvAuthSuccess, Handle);
default:
KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName());
diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
index ee4dc59590b..c1d76117796 100644
--- a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
+++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
@@ -18,9 +18,12 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) {
}
auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) {
+ NKikimrConfig::TKafkaProxyConfig Config;
+ NACLib::TUserToken userToken("root@builtin", {});
+
auto* runtime = server.CleverServer->GetRuntime();
auto request = GetMetadataRequest(topics);
- auto actorId = runtime->Register(new TKafkaMetadataActor(edgeActor, 1, request.Get()));
+ auto actorId = runtime->Register(new TKafkaMetadataActor(edgeActor, &userToken, 1, request.Get(), Config));
runtime->EnableScheduleForActor(actorId);
runtime->DispatchEvents();
Cerr << "Wait for response for topics: '";