diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-10 12:52:58 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-10 13:48:15 +0300 |
commit | 3ab959a0df5defdbcc427cb1484dabc5a0e00321 (patch) | |
tree | 506d20b17cb460dae9a177275a2b8b34770c4f02 | |
parent | 81b823859871b2a055c3b6602a537f4e6a32ace0 (diff) | |
download | ydb-3ab959a0df5defdbcc427cb1484dabc5a0e00321.tar.gz |
Check user permissions for PRODUCE requests
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp | 25 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_metadata_actor.h | 7 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp | 84 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_produce_actor.h | 18 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 74 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/metarequest_ut.cpp | 5 |
7 files changed, 139 insertions, 76 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index f023fb68932..520262927b0 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2639,7 +2639,7 @@ TIcNodeCacheServiceInitializer::TIcNodeCacheServiceInitializer(const TKikimrRunC void TIcNodeCacheServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { if (appData->FeatureFlags.GetEnableIcNodeCache()) { setup->LocalServices.emplace_back( - TActorId(), + NIcNodeCache::CreateICNodesInfoCacheServiceId(), TActorSetupCmd(NIcNodeCache::CreateICNodesInfoCacheService(appData->Counters), TMailboxType::HTSwap, appData->UserPoolId) ); diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index 7ca829e4e48..9b251733ae9 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -4,8 +4,12 @@ namespace NKafka { using namespace NKikimr::NGRpcProxy::V1; -NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message) { - return new TKafkaMetadataActor(parent, correlationId, message); +NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, + const NACLib::TUserToken* userToken, + const ui64 correlationId, + const TMetadataRequestData* message, + const NKikimrConfig::TKafkaProxyConfig& config) { + return new TKafkaMetadataActor(parent, userToken, correlationId, message, config); } void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { @@ -15,6 +19,8 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{}; auto& reqTopic = Message->Topics[i]; Response->Topics[i].Name = reqTopic.Name.value_or(""); + Response->ClusterId = "ydb-cluster"; + Response->ControllerId = 1; if (!reqTopic.Name.value_or("")) { AddTopicError(Response->Topics[i], EKafkaErrors::INVALID_TOPIC_EXCEPTION); @@ -31,17 +37,21 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { TopicIndexes[child].push_back(i); } Become(&TKafkaMetadataActor::StateWork); + RespondIfRequired(ctx); } TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest) { + KAFKA_LOG_D("Describe partitions locations for topic '" << *topicRequest.Name << "' for user '" << UserToken->GetUserSID() << "'"); + TGetPartitionsLocationRequest locationRequest{}; locationRequest.Topic = topicRequest.Name.value(); - //ToDo: Get database? - //ToDo: Authorization? + locationRequest.Token = UserToken->GetSerializedToken(); + locationRequest.Database = "/Root/test"; // TODO + PendingResponses++; - return Register(new TPartitionsLocationActor(locationRequest, SelfId())); + return Register(new TPartitionsLocationActor(locationRequest, SelfId())); } void TKafkaMetadataActor::AddTopicError( @@ -58,13 +68,16 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition; responsePartition.PartitionIndex = part.PartitionId; responsePartition.ErrorCode = NONE_ERROR; + responsePartition.LeaderId = part.NodeId; responsePartition.LeaderEpoch = part.Generation; responsePartition.ReplicaNodes.push_back(part.NodeId); + responsePartition.IsrNodes.push_back(part.NodeId); auto ins = AllClusterNodes.insert(part.NodeId); if (ins.second) { auto broker = TMetadataResponseData::TMetadataResponseBroker{}; broker.NodeId = part.NodeId; broker.Host = part.Hostname; + broker.Port = Config.GetListeningPort(); Response->Brokers.emplace_back(std::move(broker)); } topic.Partitions.emplace_back(std::move(responsePartition)); @@ -104,10 +117,10 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc return RespondIfRequired(ctx); } - //ToDo: Log and proceed on bad iter for (auto index : actorIter->second) { auto& topic = Response->Topics[index]; if (r->Status == Ydb::StatusIds::SUCCESS) { + KAFKA_LOG_D("Describe topic '" << topic.Name << "' location finishied successful"); AddTopicResponse(topic, r); } else { KAFKA_LOG_ERROR("Describe topic '" << topic.Name << "' location finishied with error: Code=" << r->Status << ", Issues=" << r->Issues.ToOneLineString()); diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h index 64a951ba142..e77617571d7 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h @@ -1,15 +1,18 @@ #include "../kafka_events.h" #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/library/aclib/aclib.h> #include <ydb/services/persqueue_v1/actors/events.h> namespace NKafka { class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> { public: - TKafkaMetadataActor(const TActorId& parent, const ui64 correlationId, const TMetadataRequestData* message) + TKafkaMetadataActor(const TActorId& parent, const NACLib::TUserToken* userToken, const ui64 correlationId, const TMetadataRequestData* message, const NKikimrConfig::TKafkaProxyConfig& config) : Parent(parent) + , UserToken(userToken) , CorrelationId(correlationId) , Message(message) + , Config(config) , Response(new TMetadataResponseData()) {} @@ -34,8 +37,10 @@ private: private: const TActorId Parent; + const NACLib::TUserToken* UserToken; const ui64 CorrelationId; const TMetadataRequestData* Message; + const NKikimrConfig::TKafkaProxyConfig& Config; ui64 PendingResponses = 0; diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index dae7662af62..a039cecb6e5 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -9,12 +9,14 @@ namespace NKafka { static constexpr TDuration WAKEUP_INTERVAL = TDuration::Seconds(1); +static constexpr TDuration TOPIC_OK_EXPIRATION_INTERVAL = TDuration::Minutes(15); static constexpr TDuration TOPIC_NOT_FOUND_EXPIRATION_INTERVAL = TDuration::Seconds(15); +static constexpr TDuration TOPIC_UNATHORIZED_EXPIRATION_INTERVAL = TDuration::Minutes(1); static constexpr TDuration REQUEST_EXPIRATION_INTERVAL = TDuration::Seconds(30); static constexpr TDuration WRITER_EXPIRATION_INTERVAL = TDuration::Minutes(5); -NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC) { - return new TKafkaProduceActor(parent, clientDC);; +NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const NACLib::TUserToken* userToken, const TString& clientDC) { + return new TKafkaProduceActor(parent, userToken, clientDC); } TString TKafkaProduceActor::LogPrefix() { @@ -71,11 +73,11 @@ void TKafkaProduceActor::PassAway() { } void TKafkaProduceActor::CleanTopics(const TActorContext& ctx) { - const auto expired = ctx.Now() - TOPIC_NOT_FOUND_EXPIRATION_INTERVAL; + const auto now = ctx.Now(); std::map<TString, TTopicInfo> newTopics; for(auto& [topicPath, topicInfo] : Topics) { - if (!topicInfo.NotFound || topicInfo.NotFoundTime > expired) { + if (topicInfo.ExpirationTime > now) { newTopics[topicPath] = std::move(topicInfo); } } @@ -109,6 +111,7 @@ void TKafkaProduceActor::EnqueueRequest(TEvKafka::TEvProduceRequest::TPtr reques } void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + auto now = ctx.Now(); auto* navigate = ev.Get()->Get()->Request.Get(); for (auto& info : navigate->ResultSet) { if (NSchemeCache::TSchemeCacheNavigate::EStatus::Ok == info.Status) { @@ -117,10 +120,20 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu TopicsForInitialization.erase(topicPath); auto& topic = Topics[topicPath]; - for(auto& p : info.PQGroupInfo->Description.GetPartitions()) { - topic.partitions[p.GetPartitionId()] = p.GetTabletId(); + + if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *UserToken)) { + topic.Status = OK; + topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL; + for(auto& p : info.PQGroupInfo->Description.GetPartitions()) { + topic.partitions[p.GetPartitionId()] = p.GetTabletId(); + } + } else { + KAFKA_LOG_W("Unauthorized PRODUCE to topic '" << topicPath << "'"); + topic.Status = UNAUTHORIZED; + topic.ExpirationTime = now + TOPIC_UNATHORIZED_EXPIRATION_INTERVAL; } + auto pathId = info.TableId.PathId; Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(pathId)); } @@ -129,8 +142,8 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu for(auto& topicPath : TopicsForInitialization) { KAFKA_LOG_D("Topic '" << topicPath << "' not found"); auto& topicInfo = Topics[topicPath]; - topicInfo.NotFound = true; - topicInfo.NotFoundTime = ctx.Now(); + topicInfo.Status = NOT_FOUND; + topicInfo.ExpirationTime = now + TOPIC_NOT_FOUND_EXPIRATION_INTERVAL; } TopicsForInitialization.clear(); @@ -155,20 +168,24 @@ void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TP } auto& topicInfo = Topics[path]; - topicInfo.NotFound = true; - topicInfo.NotFoundTime = ctx.Now(); + topicInfo.Status = NOT_FOUND; + topicInfo.ExpirationTime = ctx.Now() + TOPIC_NOT_FOUND_EXPIRATION_INTERVAL; topicInfo.partitions.clear(); } -void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& /*ctx*/) { +void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) { auto* e = ev->Get(); auto& path = e->Path; KAFKA_LOG_I("Topic '" << path << "' was updated"); auto& topic = Topics[path]; + if (topic.Status == UNAUTHORIZED) { + return; + } + topic.Status = OK; + topic.ExpirationTime = ctx.Now() + TOPIC_OK_EXPIRATION_INTERVAL; topic.partitions.clear(); for (auto& p : e->Result->GetPathDescription().GetPersQueueGroup().GetPartitions()) { - topic.NotFound = false; topic.partitions[p.GetPartitionId()] = p.GetTabletId(); } } @@ -300,8 +317,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T for(const auto& partitionData : topicData.PartitionData) { const auto partitionId = partitionData.Index; - auto writerId = PartitionWriter(topicPath, partitionId, ctx); - if (writerId) { + auto writer = PartitionWriter(topicPath, partitionId, ctx); + if (OK == writer.first) { auto ownCookie = ++Cookie; auto& cookieInfo = Cookies[ownCookie]; cookieInfo.TopicPath = topicPath; @@ -314,10 +331,17 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC); - Send(writerId, std::move(ev)); + Send(writer.second, std::move(ev)); } else { auto& result = pendingRequest.Results[position]; - result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; + switch (writer.first) { + case NOT_FOUND: + result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; + case UNAUTHORIZED: + result.ErrorCode = EKafkaErrors::TOPIC_AUTHORIZATION_FAILED; + default: + result.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR; + } } ++position; @@ -497,30 +521,30 @@ void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx) ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.release())); } -TActorId TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx) { - auto& partitionWriters = Writers[topicPath]; - auto itp = partitionWriters.find(partitionId); - if (itp != partitionWriters.end()) { - auto& writerInfo = itp->second; - writerInfo.LastAccessed = ctx.Now(); - return writerInfo.ActorId; - } - +std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx) { auto it = Topics.find(topicPath); if (it == Topics.end()) { KAFKA_LOG_ERROR("Internal error: topic '" << topicPath << "' isn`t initialized"); - return TActorId{}; + return { NOT_FOUND, TActorId{} }; } auto& topicInfo = it->second; - if (topicInfo.NotFound) { - return TActorId{}; + if (topicInfo.Status != OK) { + return { topicInfo.Status, TActorId{} }; + } + + auto& partitionWriters = Writers[topicPath]; + auto itp = partitionWriters.find(partitionId); + if (itp != partitionWriters.end()) { + auto& writerInfo = itp->second; + writerInfo.LastAccessed = ctx.Now(); + return { OK, writerInfo.ActorId }; } auto& partitions = topicInfo.partitions; auto pit = partitions.find(partitionId); if (pit == partitions.end()) { - return TActorId{}; + return { NOT_FOUND, TActorId{} }; } auto tabletId = pit->second; @@ -529,7 +553,7 @@ TActorId TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 part auto& writerInfo = partitionWriters[partitionId]; writerInfo.ActorId = ctx.RegisterWithSameMailbox(writerActor); writerInfo.LastAccessed = ctx.Now(); - return writerInfo.ActorId; + return { OK, writerInfo.ActorId }; } } // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index 43c7a170560..0849cfa3bd7 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -4,6 +4,7 @@ #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/persqueue/writer/writer.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/library/aclib/aclib.h> #include "../kafka_events.h" @@ -28,9 +29,17 @@ using namespace NKikimrClient; // class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor> { struct TPendingRequest; + + enum ETopicStatus { + OK, + NOT_FOUND, + UNAUTHORIZED + }; + public: - TKafkaProduceActor(const TActorId& client, const TString& clientDC) + TKafkaProduceActor(const TActorId& client, const NACLib::TUserToken* userToken, const TString& clientDC) : Client(client) + , UserToken(userToken) , ClientDC(clientDC) { } @@ -120,13 +129,14 @@ private: void CleanTopics(const TActorContext& ctx); void CleanWriters(const TActorContext& ctx); - TActorId PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx); + std::pair<ETopicStatus, TActorId> PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx); TString LogPrefix(); void LogEvent(IEventHandle& ev); private: const TActorId Client; + const NACLib::TUserToken* UserToken; TString SourceId; TString ClientDC; @@ -167,8 +177,8 @@ private: std::set<TString> TopicsForInitialization; struct TTopicInfo { - bool NotFound = false; - TInstant NotFoundTime; + ETopicStatus Status = OK; + TInstant ExpirationTime; // partitioId -> tabletId std::unordered_map<ui32, ui64> partitions; diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index efa5bd2cb18..34a1647283f 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -19,8 +19,8 @@ using namespace NKikimr; NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message); NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message); -NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message); -NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC); +NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const NACLib::TUserToken* userToken, const ui64 correlationId, const TMetadataRequestData* message, const NKikimrConfig::TKafkaProxyConfig& config); +NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const NACLib::TUserToken* userToken, const TString& clientDC); NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const TSaslHandshakeRequestData* message); NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message); @@ -77,7 +77,6 @@ public: NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; TString ClientDC; - i32 CorrelationId = 1; std::shared_ptr<Msg> Request; std::unordered_map<ui64, Msg::TPtr> PendingRequests; std::deque<Msg::TPtr> PendingRequestsQueue; @@ -106,13 +105,11 @@ public: IsSslSupported = IsSslSupported && Socket->IsSslSupported(); } - void Bootstrap(const TActorContext& ctx) { + void Bootstrap() { Become(&TKafkaConnection::StateAccepting); Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false)); KAFKA_LOG_I("incoming connection opened " << Address); - ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(SelfId(), ClientDC)); - OnAccept(); } @@ -131,7 +128,7 @@ public: protected: void LogEvent(IEventHandle& ev) { - KAFKA_LOG_T("Event: " << ev.GetTypeName()); + KAFKA_LOG_T("Received event: " << ev.GetTypeName()); } void SetNonBlock() noexcept { @@ -197,7 +194,7 @@ protected: switch (ev->GetTypeRewrite()) { hFunc(TEvPollerReady, HandleAccepting); hFunc(TEvPollerRegisterResult, HandleAccepting); - hFunc(TEvKafka::TEvResponse, Handle); + HFunc(TEvKafka::TEvResponse, Handle); default: KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName()); } @@ -207,7 +204,15 @@ protected: Register(CreateKafkaApiVersionsActor(SelfId(), header->CorrelationId, message)); } - void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message) { + void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message, const TActorContext& ctx) { + if (!UserToken) { + KAFKA_LOG_ERROR("Unauthenificated produce"); + PassAway(); + } + if (!ProduceActorId) { + ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(SelfId(), UserToken.Get(), ClientDC)); + } + Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message)); } @@ -216,7 +221,7 @@ protected: } void HandleMessage(TRequestHeaderData* header, const TMetadataRequestData* message) { - Register(CreateKafkaMetadataActor(SelfId(), header->CorrelationId, message)); + Register(CreateKafkaMetadataActor(SelfId(), UserToken.Get(), header->CorrelationId, message, Config)); } void HandleMessage(const TRequestHeaderData* header, const TSaslAuthenticateRequestData* message) { @@ -227,7 +232,7 @@ protected: Register(CreateKafkaSaslAuthActor(SelfId(), header->CorrelationId, message)); } - void ProcessRequest() { + void ProcessRequest(const TActorContext& ctx) { KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize << ", Size=" << Request->Size); @@ -239,7 +244,7 @@ protected: switch (Request->Header.RequestApiKey) { case PRODUCE: - HandleMessage(&Request->Header, dynamic_cast<TProduceRequestData*>(message)); + HandleMessage(&Request->Header, dynamic_cast<TProduceRequestData*>(message), ctx); return; case API_VERSIONS: @@ -268,16 +273,17 @@ protected: } } - void Handle(TEvKafka::TEvResponse::TPtr response) { + void Handle(TEvKafka::TEvResponse::TPtr response, const TActorContext& ctx) { auto r = response->Get(); - Reply(r->CorrelationId, r->Response); + Reply(r->CorrelationId, r->Response, ctx); } void Handle(TEvKafka::TEvAuthSuccess::TPtr auth) { UserToken = auth->Get()->UserToken; - } + KAFKA_LOG_D("Authentificated successful. SID=" << UserToken->GetUserSID()); + } - void Reply(const ui64 correlationId, TApiMessage::TPtr response) { + void Reply(const ui64 correlationId, TApiMessage::TPtr response, const TActorContext& ctx) { auto it = PendingRequests.find(correlationId); if (it == PendingRequests.end()) { KAFKA_LOG_ERROR("Unexpected correlationId " << correlationId); @@ -290,7 +296,7 @@ protected: ProcessReplyQueue(); - DoRead(); + DoRead(ctx); } void ProcessReplyQueue() { @@ -330,7 +336,7 @@ protected: KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size); } - void DoRead() { + void DoRead(const TActorContext& ctx) { KAFKA_LOG_T("DoRead: Demand=" << Demand.Length << ", Step=" << static_cast<i32>(Step)); for (;;) { @@ -389,11 +395,18 @@ protected: case MESSAGE_PROCESS: TKafkaInt16 apiKey = *(TKafkaInt16*)Request->Buffer.Data(); TKafkaVersion apiVersion = *(TKafkaVersion*)(Request->Buffer.Data() + sizeof(TKafkaInt16)); + TKafkaInt32 correlationId = *(TKafkaInt32*)(Request->Buffer.Data() + sizeof(TKafkaInt16) + sizeof(TKafkaInt16)); NormalizeNumber(apiKey); NormalizeNumber(apiVersion); + NormalizeNumber(correlationId); - KAFKA_LOG_D("received message. ApiKey=" << apiKey << ", Version=" << apiVersion); + KAFKA_LOG_D("received message. ApiKey=" << apiKey << ", Version=" << apiVersion << ", CorrelationId=" << correlationId); + + if (PendingRequests.contains(correlationId)) { + KAFKA_LOG_ERROR("CorrelationId " << correlationId << " already processing"); + return PassAway(); + } // Print("received", Request->Buffer, Request->ExpectedSize); @@ -402,23 +415,18 @@ protected: Request->Message = CreateRequest(apiKey); try { Request->Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion)); - if (Request->Header.CorrelationId != CorrelationId) { - KAFKA_LOG_ERROR("Unexpected correlationId. Expected=" << CorrelationId << ", Received=" << Request->Header.CorrelationId); - return PassAway(); - } Request->Message->Read(readable, apiVersion); - - ++CorrelationId; } catch(const yexception& e) { - KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request->Header.RequestApiKey - << ", Version=" << Request->Header.RequestApiVersion - << ", Error=" << e.what()); + KAFKA_LOG_ERROR("error on processing message: ApiKey=" << apiKey + << ", Version=" << apiVersion + << ", CorrelationId=" << correlationId + << ", Error=" << e.what()); return PassAway(); } Step = SIZE_READ; - ProcessRequest(); + ProcessRequest(ctx); break; } @@ -426,9 +434,9 @@ protected: } } - void HandleConnected(TEvPollerReady::TPtr event) { + void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) { if (event->Get()->Read) { - DoRead(); + DoRead(ctx); if (event->Get() == InactivityEvent) { const TDuration passed = TDuration::Seconds(std::abs(InactivityTimer.Passed())); @@ -460,9 +468,9 @@ protected: STATEFN(StateConnected) { LogEvent(*ev.Get()); switch (ev->GetTypeRewrite()) { - hFunc(TEvPollerReady, HandleConnected); + HFunc(TEvPollerReady, HandleConnected); hFunc(TEvPollerRegisterResult, HandleConnected); - hFunc(TEvKafka::TEvResponse, Handle); + HFunc(TEvKafka::TEvResponse, Handle); hFunc(TEvKafka::TEvAuthSuccess, Handle); default: KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName()); diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp index ee4dc59590b..c1d76117796 100644 --- a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp +++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp @@ -18,9 +18,12 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) { } auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) { + NKikimrConfig::TKafkaProxyConfig Config; + NACLib::TUserToken userToken("root@builtin", {}); + auto* runtime = server.CleverServer->GetRuntime(); auto request = GetMetadataRequest(topics); - auto actorId = runtime->Register(new TKafkaMetadataActor(edgeActor, 1, request.Get())); + auto actorId = runtime->Register(new TKafkaMetadataActor(edgeActor, &userToken, 1, request.Get(), Config)); runtime->EnableScheduleForActor(actorId); runtime->DispatchEvents(); Cerr << "Wait for response for topics: '"; |