diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-07 15:31:30 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-07 16:42:40 +0300 |
commit | f785ad0ebb0c4618aa237124e6ad8f68dd622023 (patch) | |
tree | 538f8abb4714948cccee0c466c42e9bd9a45b2d2 | |
parent | 0dca84682c20215363efe0441ca2b46f6d2e6c81 (diff) | |
download | ydb-f785ad0ebb0c4618aa237124e6ad8f68dd622023.tar.gz |
Respond to requests in the order they are received
+ extract request handlers and move its to actors directory
+ check incomming correlation id
+ use correlationId instead cookie in kafka requests
16 files changed, 255 insertions, 124 deletions
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt index d80040e4c5..4fe0387ca6 100644 --- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt @@ -20,10 +20,12 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC ydb-services-persqueue_v1 ) target_sources(ydb-core-kafka_proxy PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt index df99502c2a..2acbd612c9 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt @@ -21,10 +21,12 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC ydb-services-persqueue_v1 ) target_sources(ydb-core-kafka_proxy PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt index df99502c2a..2acbd612c9 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt @@ -21,10 +21,12 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC ydb-services-persqueue_v1 ) target_sources(ydb-core-kafka_proxy PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt index d80040e4c5..4fe0387ca6 100644 --- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt @@ -20,10 +20,12 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC ydb-services-persqueue_v1 ) target_sources(ydb-core-kafka_proxy PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp new file mode 100644 index 0000000000..4acdd053b2 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp @@ -0,0 +1,39 @@ +#include "kafka_api_versions_actor.h" + +namespace NKafka { + +NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message) { + return new TKafkaApiVersionsActor(parent, correlationId, message); +} + +TApiVersionsResponseData::TPtr GetApiVersions() { + TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>(); + response->ApiKeys.resize(4); + + response->ApiKeys[0].ApiKey = PRODUCE; + response->ApiKeys[0].MinVersion = 3; // From version 3 record batch format is 2. Supported only 2th batch format. + response->ApiKeys[0].MaxVersion = TProduceRequestData::MessageMeta::PresentVersions.Max; + + response->ApiKeys[1].ApiKey = API_VERSIONS; + response->ApiKeys[1].MinVersion = TApiVersionsRequestData::MessageMeta::PresentVersions.Min; + response->ApiKeys[1].MaxVersion = TApiVersionsRequestData::MessageMeta::PresentVersions.Max; + + response->ApiKeys[2].ApiKey = METADATA; + response->ApiKeys[2].MinVersion = TMetadataRequestData::MessageMeta::PresentVersions.Min; + response->ApiKeys[2].MaxVersion = TMetadataRequestData::MessageMeta::PresentVersions.Max; + + response->ApiKeys[3].ApiKey = INIT_PRODUCER_ID; + response->ApiKeys[3].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min; + response->ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max; + + return response; +} + +void TKafkaApiVersionsActor::Bootstrap(const NActors::TActorContext& ctx) { + Y_UNUSED(Message); + + Send(Parent, new TEvKafka::TEvResponse(CorrelationId, GetApiVersions())); + Die(ctx); +} + +} diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h new file mode 100644 index 0000000000..4a00822b86 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h @@ -0,0 +1,22 @@ +#include "../kafka_events.h" +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKafka { + +class TKafkaApiVersionsActor: public NActors::TActorBootstrapped<TKafkaApiVersionsActor> { +public: + TKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message) + : Parent(parent) + , CorrelationId(correlationId) + , Message(message) { + } + + void Bootstrap(const NActors::TActorContext& ctx); + +private: + const TActorId Parent; + const ui64 CorrelationId; + const TApiVersionsRequestData* Message; +}; + +} // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp new file mode 100644 index 0000000000..065158c16a --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp @@ -0,0 +1,27 @@ +#include "kafka_init_producer_id_actor.h" + +namespace NKafka { + +NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message) { + return new TKafkaInitProducerIdActor(parent, correlationId, message); +} + +TInitProducerIdResponseData::TPtr GetResponse() { + TInitProducerIdResponseData::TPtr response = std::make_shared<TInitProducerIdResponseData>(); + + response->ProducerEpoch = 1; + response->ProducerId = 1; + response->ErrorCode = 0; + response->ThrottleTimeMs = 0; + + return response; +} + +void TKafkaInitProducerIdActor::Bootstrap(const NActors::TActorContext& ctx) { + Y_UNUSED(Message); + + Send(Parent, new TEvKafka::TEvResponse(CorrelationId, GetResponse())); + Die(ctx); +} + +} diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h new file mode 100644 index 0000000000..78e5622d2a --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h @@ -0,0 +1,22 @@ +#include "../kafka_events.h" +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKafka { + +class TKafkaInitProducerIdActor: public NActors::TActorBootstrapped<TKafkaInitProducerIdActor> { +public: + TKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message) + : Parent(parent) + , CorrelationId(correlationId) + , Message(message) { + } + + void Bootstrap(const NActors::TActorContext& ctx); + +private: + const TActorId Parent; + const ui64 CorrelationId; + const TInitProducerIdRequestData* Message; +}; + +} // NKafka diff --git a/ydb/core/kafka_proxy/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index e74540860c..ec57a6fd41 100644 --- a/ydb/core/kafka_proxy/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -4,6 +4,10 @@ namespace NKafka { using namespace NKikimr::NGRpcProxy::V1; +NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message) { + return new TKafkaMetadataActor(parent, correlationId, message); +} + void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) { Response->Topics.resize(Message->Topics.size()); THashMap<TString, TActorId> partitionActors; @@ -112,7 +116,7 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) { if (PendingResponses == 0) { - Send(Parent, new TEvKafka::TEvResponse(Cookie, Response)); + Send(Parent, new TEvKafka::TEvResponse(CorrelationId, Response)); Die(ctx); } } diff --git a/ydb/core/kafka_proxy/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h index 67f6c64a96..f99c245159 100644 --- a/ydb/core/kafka_proxy/kafka_metadata_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h @@ -1,4 +1,4 @@ -#include "kafka_events.h" +#include "../kafka_events.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <ydb/services/persqueue_v1/actors/events.h> @@ -6,9 +6,9 @@ namespace NKafka { class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> { public: - TKafkaMetadataActor(ui64 cookie, TMetadataRequestData* message, const TActorId& parent) - : Cookie(cookie) - , Parent(parent) + TKafkaMetadataActor(const TActorId& parent, const ui64 correlationId, const TMetadataRequestData* message) + : Parent(parent) + , CorrelationId(correlationId) , Message(message) , Response(new TMetadataResponseData()) {} @@ -29,9 +29,10 @@ private: HFunc(TEvLocationResponse, HandleResponse); } } - ui64 Cookie; - TActorId Parent; - TMetadataRequestData* Message; + const TActorId Parent; + const ui64 CorrelationId; + const TMetadataRequestData* Message; + ui64 PendingResponses = 0; TMetadataResponseData::TPtr Response; diff --git a/ydb/core/kafka_proxy/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 8a3ff23d28..b5c99bc227 100644 --- a/ydb/core/kafka_proxy/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -12,6 +12,10 @@ static constexpr TDuration TOPIC_NOT_FOUND_EXPIRATION_INTERVAL = TDuration::Seco static constexpr TDuration REQUEST_EXPIRATION_INTERVAL = TDuration::Seconds(30); static constexpr TDuration WRITER_EXPIRATION_INTERVAL = TDuration::Minutes(5); +NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC) { + return new TKafkaProduceActor(parent, clientDC);; +} + TString TKafkaProduceActor::LogPrefix() { TStringBuilder sb; sb << "TKafkaProduceActor " << SelfId() << " State: "; @@ -398,9 +402,9 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { } auto* r = pendingRequest.Request->Get()->Request; - auto cookie = pendingRequest.Request->Get()->Cookie; + auto correlationId = pendingRequest.Request->Get()->CorrelationId; - KAFKA_LOG_D("Send result for cookie " << cookie << ". Expired=" << expired); + KAFKA_LOG_D("Send result for correlationId " << correlationId << ". Expired=" << expired); const auto topicsCount = r->TopicData.size(); auto response = std::make_shared<TProduceResponseData>(); @@ -445,7 +449,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { } } - Send(Client, new TEvKafka::TEvResponse(cookie, response)); + Send(Client, new TEvKafka::TEvResponse(correlationId, response)); if (!pendingRequest.WaitAcceptingCookies.empty()) { if (!expired) { diff --git a/ydb/core/kafka_proxy/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index c618da5494..43c7a17056 100644 --- a/ydb/core/kafka_proxy/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -5,7 +5,7 @@ #include <ydb/core/persqueue/writer/writer.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> -#include "kafka_events.h" +#include "../kafka_events.h" namespace NKafka { diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index e486e457c7..cf19589853 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -5,8 +5,6 @@ #include "kafka_connection.h" #include "kafka_events.h" #include "kafka_messages.h" -#include "kafka_produce_actor.h" -#include "kafka_metadata_actor.h" #include "kafka_log_impl.h" #include <strstream> @@ -18,6 +16,11 @@ namespace NKafka { using namespace NActors; using namespace NKikimr; +NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message); +NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message); +NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message); +NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC); + char Hex(const unsigned char c) { return c < 10 ? '0' + c : 'A' + c - 10; } @@ -34,41 +37,21 @@ void Print(const TString& marker, TBuffer& buffer, ssize_t length) { KAFKA_LOG_ERROR("Packet " << marker << ": " << sb); } -TApiVersionsResponseData GetApiVersions() { - TApiVersionsResponseData response; - response.ApiKeys.resize(4); - - response.ApiKeys[0].ApiKey = PRODUCE; - response.ApiKeys[0].MinVersion = 3; // From version 3 record batch format is 2. Supported only 2 batch format. - response.ApiKeys[0].MaxVersion = TProduceRequestData::MessageMeta::PresentVersions.Max; - - response.ApiKeys[1].ApiKey = API_VERSIONS; - response.ApiKeys[1].MinVersion = TApiVersionsRequestData::MessageMeta::PresentVersions.Min; - response.ApiKeys[1].MaxVersion = TApiVersionsRequestData::MessageMeta::PresentVersions.Max; - - response.ApiKeys[2].ApiKey = METADATA; - response.ApiKeys[2].MinVersion = TMetadataRequestData::MessageMeta::PresentVersions.Min; - response.ApiKeys[2].MaxVersion = TMetadataRequestData::MessageMeta::PresentVersions.Max; - - response.ApiKeys[3].ApiKey = INIT_PRODUCER_ID; - response.ApiKeys[3].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min; - response.ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max; - - return response; -} - -static const TApiVersionsResponseData KAFKA_API_VERSIONS = GetApiVersions(); - class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNetworkConfig { public: using TBase = TActorBootstrapped<TKafkaConnection>; struct Msg { + using TPtr=std::shared_ptr<Msg>; + size_t Size = 0; TKafkaInt32 ExpectedSize = 0; TBuffer Buffer; + TRequestHeaderData Header; - std::unique_ptr<TMessage> Message; + std::unique_ptr<TApiMessage> Message; + + TApiMessage::TPtr Response; }; static constexpr TDuration InactivityTimeout = TDuration::Minutes(10); @@ -90,7 +73,10 @@ public: NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; TString ClientDC; - Msg Request; + i32 CorrelationId = 0; + std::shared_ptr<Msg> Request; + std::unordered_map<ui64, Msg::TPtr> PendingRequests; + std::deque<Msg::TPtr> PendingRequestsQueue; enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, MESSAGE_READ, MESSAGE_PROCESS }; EReadSteps Step; @@ -101,7 +87,6 @@ public: TActorId ProduceActorId; - std::unordered_map<ui64, Msg> PendingRequests; TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const NKikimrConfig::TKafkaProxyConfig& config) @@ -120,7 +105,7 @@ public: Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false)); KAFKA_LOG_I("incoming connection opened " << Address); - ProduceActorId = ctx.RegisterWithSameMailbox(new TKafkaProduceActor(SelfId(), ClientDC)); + ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(SelfId(), ClientDC)); OnAccept(); } @@ -212,81 +197,92 @@ protected: } } - void HandleMessage(TRequestHeaderData* header, TApiVersionsRequestData* /*message*/, size_t messageSize) { - Reply(header, &KAFKA_API_VERSIONS); - - InflightSize -= messageSize; + void HandleMessage(TRequestHeaderData* header, const TApiVersionsRequestData* message) { + Register(CreateKafkaApiVersionsActor(SelfId(), header->CorrelationId, message)); } - void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message, size_t /*messageSize*/) { - PendingRequests[header->CorrelationId] = std::move(Request); + void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message) { Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message)); } - void Handle(TEvKafka::TEvResponse::TPtr response) { - auto r = response->Get(); - Reply(r->Cookie, r->Response.get()); + void HandleMessage(const TRequestHeaderData* header, const TInitProducerIdRequestData* message) { + Register(CreateKafkaInitProducerIdActor(SelfId(), header->CorrelationId, message)); } - void HandleMessage(const TRequestHeaderData* header, const TInitProducerIdRequestData* /*message*/, size_t messageSize) { - TInitProducerIdResponseData response; - response.ProducerEpoch = 1; - response.ProducerId = 1; - response.ErrorCode = 0; - response.ThrottleTimeMs = 0; + void HandleMessage(TRequestHeaderData* header, const TMetadataRequestData* message) { + Register(CreateKafkaMetadataActor(SelfId(), header->CorrelationId, message)); + } - Reply(header, &response); + void ProcessRequest() { + KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize + << ", Size=" << Request->Size); - InflightSize -= messageSize; - } + Msg::TPtr r = Request; + PendingRequestsQueue.push_back(r); + PendingRequests[r->Header.CorrelationId] = r; - void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* message, size_t /*messageSize*/) { - PendingRequests[header->CorrelationId] = std::move(Request); - Register(new TKafkaMetadataActor(header->CorrelationId, message, SelfId())); - } + TApiMessage* message = Request->Message.get(); - void ProcessRequest() { - KAFKA_LOG_D("process message: ApiKey=" << Request.Header.RequestApiKey << ", ExpectedSize=" << Request.ExpectedSize - << ", Size=" << Request.Size); - switch (Request.Header.RequestApiKey) { + switch (Request->Header.RequestApiKey) { case PRODUCE: - HandleMessage(&Request.Header, dynamic_cast<TProduceRequestData*>(Request.Message.get()), Request.ExpectedSize); + HandleMessage(&Request->Header, dynamic_cast<TProduceRequestData*>(message)); return; case API_VERSIONS: - HandleMessage(&Request.Header, dynamic_cast<TApiVersionsRequestData*>(Request.Message.get()), Request.ExpectedSize); + HandleMessage(&Request->Header, dynamic_cast<TApiVersionsRequestData*>(message)); return; case INIT_PRODUCER_ID: - HandleMessage(&Request.Header, dynamic_cast<TInitProducerIdRequestData*>(Request.Message.get()), Request.ExpectedSize); + HandleMessage(&Request->Header, dynamic_cast<TInitProducerIdRequestData*>(message)); return; case METADATA: - HandleMessage(&Request.Header, dynamic_cast<TMetadataRequestData*>(Request.Message.get()), Request.ExpectedSize); + HandleMessage(&Request->Header, dynamic_cast<TMetadataRequestData*>(message)); return; default: - KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request.Header.RequestApiKey); + KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey); + PassAway(); } } - void Reply(const ui64 cookie, const TApiMessage* response) { - auto it = PendingRequests.find(cookie); + void Handle(TEvKafka::TEvResponse::TPtr response) { + auto r = response->Get(); + Reply(r->CorrelationId, r->Response); + } + + void Reply(const ui64 correlationId, TApiMessage::TPtr response) { + auto it = PendingRequests.find(correlationId); if (it == PendingRequests.end()) { - KAFKA_LOG_ERROR("Unexpected cookie " << cookie); + KAFKA_LOG_ERROR("Unexpected correlationId " << correlationId); return; } - auto& request = it->second; - Reply(&request.Header, response); - - InflightSize -= request.ExpectedSize; + auto request = it->second; + request->Response = response; + request->Buffer.Clear(); - PendingRequests.erase(it); + ProcessReplyQueue(); DoRead(); } + void ProcessReplyQueue() { + while(!PendingRequestsQueue.empty()) { + auto& request = PendingRequestsQueue.front(); + if (request->Response.get() == nullptr) { + break; + } + + Reply(&request->Header, request->Response.get()); + + InflightSize -= request->ExpectedSize; + + PendingRequests.erase(request->Header.CorrelationId); + PendingRequestsQueue.pop_front(); + } + } + void Reply(const TRequestHeaderData* header, const TApiMessage* reply) { TKafkaVersion headerVersion = ResponseHeaderVersion(header->RequestApiKey, header->RequestApiVersion); TKafkaVersion version = header->RequestApiVersion; @@ -328,69 +324,76 @@ protected: } received = res; - Request.Size += received; + Request->Size += received; Demand.Buffer += received; Demand.Length -= received; } if (!Demand) { switch (Step) { case SIZE_READ: - Demand = TReadDemand((char*)&(Request.ExpectedSize), sizeof(Request.ExpectedSize)); + Request = std::make_unique<Msg>(); + Demand = TReadDemand((char*)&(Request->ExpectedSize), sizeof(Request->ExpectedSize)); Step = SIZE_PREPARE; break; case SIZE_PREPARE: - NormalizeNumber(Request.ExpectedSize); - if ((ui64)Request.ExpectedSize > Config.GetMaxMessageSize()) { - KAFKA_LOG_ERROR("message is big. Size: " << Request.ExpectedSize); + NormalizeNumber(Request->ExpectedSize); + if ((ui64)Request->ExpectedSize > Config.GetMaxMessageSize()) { + KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize); return PassAway(); } Step = INFLIGTH_CHECK; case INFLIGTH_CHECK: - if (InflightSize + Request.ExpectedSize > Config.GetMaxInflightSize()) { + if (InflightSize + Request->ExpectedSize > Config.GetMaxInflightSize()) { return; } - InflightSize += Request.ExpectedSize; + InflightSize += Request->ExpectedSize; Step = MESSAGE_READ; case MESSAGE_READ: - KAFKA_LOG_T("start read new message. ExpectedSize=" << Request.ExpectedSize); + KAFKA_LOG_T("start read new message. ExpectedSize=" << Request->ExpectedSize); - Request.Buffer.Resize(Request.ExpectedSize); - Demand = TReadDemand(Request.Buffer.Data(), Request.ExpectedSize); + Request->Buffer.Resize(Request->ExpectedSize); + Demand = TReadDemand(Request->Buffer.Data(), Request->ExpectedSize); Step = MESSAGE_PROCESS; break; case MESSAGE_PROCESS: - TKafkaInt16 apiKey = *(TKafkaInt16*)Request.Buffer.Data(); - TKafkaVersion apiVersion = *(TKafkaVersion*)(Request.Buffer.Data() + sizeof(TKafkaInt16)); + TKafkaInt16 apiKey = *(TKafkaInt16*)Request->Buffer.Data(); + TKafkaVersion apiVersion = *(TKafkaVersion*)(Request->Buffer.Data() + sizeof(TKafkaInt16)); NormalizeNumber(apiKey); NormalizeNumber(apiVersion); - KAFKA_LOG_D("received message. ApiKey=" << Request.Header.RequestApiKey - << ", Version=" << Request.Header.RequestApiVersion); + KAFKA_LOG_D("received message. ApiKey=" << apiKey << ", Version=" << apiVersion); - // Print("received", Request.Buffer, Request.ExpectedSize); + // Print("received", Request->Buffer, Request->ExpectedSize); - TKafkaReadable readable(Request.Buffer); + TKafkaReadable readable(Request->Buffer); - Request.Message = CreateRequest(apiKey); + Request->Message = CreateRequest(apiKey); try { - Request.Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion)); - Request.Message->Read(readable, apiVersion); + Request->Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion)); + if (Request->Header.CorrelationId != CorrelationId) { + KAFKA_LOG_ERROR("Unexpected correlationId. Expected=" << CorrelationId << ", Received=" << Request->Header.CorrelationId); + return PassAway(); + } + Request->Message->Read(readable, apiVersion); + + ++CorrelationId; } catch(const yexception& e) { - KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request.Header.RequestApiKey - << ", Version=" << Request.Header.RequestApiVersion + KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request->Header.RequestApiKey + << ", Version=" << Request->Header.RequestApiVersion << ", Error=" << e.what()); - return PassAway(); + return PassAway(); } + Step = SIZE_READ; + ProcessRequest(); - Step = SIZE_READ; break; } } diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index f674fe9804..cf9bcfadd6 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -25,22 +25,22 @@ struct TEvKafka { struct TEvProduceRequest : public TEventLocal<TEvProduceRequest, EvProduceRequest> { - TEvProduceRequest(const ui64 cookie, const TProduceRequestData* request) - : Cookie(cookie) + TEvProduceRequest(const ui64 correlationId, const TProduceRequestData* request) + : CorrelationId(correlationId) , Request(request) {} - ui64 Cookie; + ui64 CorrelationId; const TProduceRequestData* Request; }; struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> { - TEvResponse(const ui64 cookie, const TApiMessage::TPtr response) - : Cookie(cookie) + TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response) + : CorrelationId(correlationId) , Response(std::move(response)) { } - ui64 Cookie; + const ui64 CorrelationId; const TApiMessage::TPtr Response; }; diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp index c7cbb25e88..cb8da65f67 100644 --- a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp +++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp @@ -1,7 +1,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> #include <ydb/core/kafka_proxy/kafka_events.h> -#include <ydb/core/kafka_proxy/kafka_metadata_actor.h> +#include <ydb/core/kafka_proxy/actors/kafka_metadata_actor.h> namespace NKafka::NTests { @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) { auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) { auto* runtime = server.CleverServer->GetRuntime(); auto request = GetMetadataRequest(topics); - auto actorId = runtime->Register(new TKafkaMetadataActor(1, request.Get(), edgeActor)); + auto actorId = runtime->Register(new TKafkaMetadataActor(edgeActor, 1, request.Get())); runtime->EnableScheduleForActor(actorId); runtime->DispatchEvents(); Cerr << "Wait for response for topics: '"; diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make index 326d8f9080..03fab884a5 100644 --- a/ydb/core/kafka_proxy/ya.make +++ b/ydb/core/kafka_proxy/ya.make @@ -1,6 +1,10 @@ LIBRARY() SRCS( + actors/kafka_api_versions_actor.cpp + actors/kafka_init_producer_id_actor.cpp + actors/kafka_metadata_actor.cpp + actors/kafka_produce_actor.cpp kafka_connection.cpp kafka_connection.h kafka_listener.h @@ -11,9 +15,6 @@ SRCS( kafka_messages.h kafka_messages_int.cpp kafka_messages_int.h - kafka_produce_actor.cpp - kafka_metadata_actor.cpp - kafka_produce_actor.h kafka_proxy.h kafka_records.cpp ) |