aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-07 15:31:30 +0300
committertesseract <tesseract@yandex-team.com>2023-08-07 16:42:40 +0300
commitf785ad0ebb0c4618aa237124e6ad8f68dd622023 (patch)
tree538f8abb4714948cccee0c466c42e9bd9a45b2d2
parent0dca84682c20215363efe0441ca2b46f6d2e6c81 (diff)
downloadydb-f785ad0ebb0c4618aa237124e6ad8f68dd622023.tar.gz
Respond to requests in the order they are received
+ extract request handlers and move its to actors directory + check incomming correlation id + use correlationId instead cookie in kafka requests
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt6
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt6
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt6
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt6
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp39
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h22
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp27
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h22
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp (renamed from ydb/core/kafka_proxy/kafka_metadata_actor.cpp)6
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.h (renamed from ydb/core/kafka_proxy/kafka_metadata_actor.h)15
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp (renamed from ydb/core/kafka_proxy/kafka_produce_actor.cpp)10
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.h (renamed from ydb/core/kafka_proxy/kafka_produce_actor.h)2
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp189
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h12
-rw-r--r--ydb/core/kafka_proxy/ut/metarequest_ut.cpp4
-rw-r--r--ydb/core/kafka_proxy/ya.make7
16 files changed, 255 insertions, 124 deletions
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
index d80040e4c5..4fe0387ca6 100644
--- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
@@ -20,10 +20,12 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC
ydb-services-persqueue_v1
)
target_sources(ydb-core-kafka_proxy PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
index df99502c2a..2acbd612c9 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
@@ -21,10 +21,12 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC
ydb-services-persqueue_v1
)
target_sources(ydb-core-kafka_proxy PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
index df99502c2a..2acbd612c9 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
@@ -21,10 +21,12 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC
ydb-services-persqueue_v1
)
target_sources(ydb-core-kafka_proxy PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
index d80040e4c5..4fe0387ca6 100644
--- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
@@ -20,10 +20,12 @@ target_link_libraries(ydb-core-kafka_proxy PUBLIC
ydb-services-persqueue_v1
)
target_sources(ydb-core-kafka_proxy PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
new file mode 100644
index 0000000000..4acdd053b2
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
@@ -0,0 +1,39 @@
+#include "kafka_api_versions_actor.h"
+
+namespace NKafka {
+
+NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message) {
+ return new TKafkaApiVersionsActor(parent, correlationId, message);
+}
+
+TApiVersionsResponseData::TPtr GetApiVersions() {
+ TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>();
+ response->ApiKeys.resize(4);
+
+ response->ApiKeys[0].ApiKey = PRODUCE;
+ response->ApiKeys[0].MinVersion = 3; // From version 3 record batch format is 2. Supported only 2th batch format.
+ response->ApiKeys[0].MaxVersion = TProduceRequestData::MessageMeta::PresentVersions.Max;
+
+ response->ApiKeys[1].ApiKey = API_VERSIONS;
+ response->ApiKeys[1].MinVersion = TApiVersionsRequestData::MessageMeta::PresentVersions.Min;
+ response->ApiKeys[1].MaxVersion = TApiVersionsRequestData::MessageMeta::PresentVersions.Max;
+
+ response->ApiKeys[2].ApiKey = METADATA;
+ response->ApiKeys[2].MinVersion = TMetadataRequestData::MessageMeta::PresentVersions.Min;
+ response->ApiKeys[2].MaxVersion = TMetadataRequestData::MessageMeta::PresentVersions.Max;
+
+ response->ApiKeys[3].ApiKey = INIT_PRODUCER_ID;
+ response->ApiKeys[3].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min;
+ response->ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max;
+
+ return response;
+}
+
+void TKafkaApiVersionsActor::Bootstrap(const NActors::TActorContext& ctx) {
+ Y_UNUSED(Message);
+
+ Send(Parent, new TEvKafka::TEvResponse(CorrelationId, GetApiVersions()));
+ Die(ctx);
+}
+
+}
diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h
new file mode 100644
index 0000000000..4a00822b86
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.h
@@ -0,0 +1,22 @@
+#include "../kafka_events.h"
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKafka {
+
+class TKafkaApiVersionsActor: public NActors::TActorBootstrapped<TKafkaApiVersionsActor> {
+public:
+ TKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message)
+ : Parent(parent)
+ , CorrelationId(correlationId)
+ , Message(message) {
+ }
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+private:
+ const TActorId Parent;
+ const ui64 CorrelationId;
+ const TApiVersionsRequestData* Message;
+};
+
+} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
new file mode 100644
index 0000000000..065158c16a
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
@@ -0,0 +1,27 @@
+#include "kafka_init_producer_id_actor.h"
+
+namespace NKafka {
+
+NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message) {
+ return new TKafkaInitProducerIdActor(parent, correlationId, message);
+}
+
+TInitProducerIdResponseData::TPtr GetResponse() {
+ TInitProducerIdResponseData::TPtr response = std::make_shared<TInitProducerIdResponseData>();
+
+ response->ProducerEpoch = 1;
+ response->ProducerId = 1;
+ response->ErrorCode = 0;
+ response->ThrottleTimeMs = 0;
+
+ return response;
+}
+
+void TKafkaInitProducerIdActor::Bootstrap(const NActors::TActorContext& ctx) {
+ Y_UNUSED(Message);
+
+ Send(Parent, new TEvKafka::TEvResponse(CorrelationId, GetResponse()));
+ Die(ctx);
+}
+
+}
diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h
new file mode 100644
index 0000000000..78e5622d2a
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.h
@@ -0,0 +1,22 @@
+#include "../kafka_events.h"
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKafka {
+
+class TKafkaInitProducerIdActor: public NActors::TActorBootstrapped<TKafkaInitProducerIdActor> {
+public:
+ TKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message)
+ : Parent(parent)
+ , CorrelationId(correlationId)
+ , Message(message) {
+ }
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+private:
+ const TActorId Parent;
+ const ui64 CorrelationId;
+ const TInitProducerIdRequestData* Message;
+};
+
+} // NKafka
diff --git a/ydb/core/kafka_proxy/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
index e74540860c..ec57a6fd41 100644
--- a/ydb/core/kafka_proxy/kafka_metadata_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
@@ -4,6 +4,10 @@
namespace NKafka {
using namespace NKikimr::NGRpcProxy::V1;
+NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message) {
+ return new TKafkaMetadataActor(parent, correlationId, message);
+}
+
void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
Response->Topics.resize(Message->Topics.size());
THashMap<TString, TActorId> partitionActors;
@@ -112,7 +116,7 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc
void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
if (PendingResponses == 0) {
- Send(Parent, new TEvKafka::TEvResponse(Cookie, Response));
+ Send(Parent, new TEvKafka::TEvResponse(CorrelationId, Response));
Die(ctx);
}
}
diff --git a/ydb/core/kafka_proxy/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
index 67f6c64a96..f99c245159 100644
--- a/ydb/core/kafka_proxy/kafka_metadata_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
@@ -1,4 +1,4 @@
-#include "kafka_events.h"
+#include "../kafka_events.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <ydb/services/persqueue_v1/actors/events.h>
@@ -6,9 +6,9 @@ namespace NKafka {
class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActor> {
public:
- TKafkaMetadataActor(ui64 cookie, TMetadataRequestData* message, const TActorId& parent)
- : Cookie(cookie)
- , Parent(parent)
+ TKafkaMetadataActor(const TActorId& parent, const ui64 correlationId, const TMetadataRequestData* message)
+ : Parent(parent)
+ , CorrelationId(correlationId)
, Message(message)
, Response(new TMetadataResponseData())
{}
@@ -29,9 +29,10 @@ private:
HFunc(TEvLocationResponse, HandleResponse);
}
}
- ui64 Cookie;
- TActorId Parent;
- TMetadataRequestData* Message;
+ const TActorId Parent;
+ const ui64 CorrelationId;
+ const TMetadataRequestData* Message;
+
ui64 PendingResponses = 0;
TMetadataResponseData::TPtr Response;
diff --git a/ydb/core/kafka_proxy/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index 8a3ff23d28..b5c99bc227 100644
--- a/ydb/core/kafka_proxy/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -12,6 +12,10 @@ static constexpr TDuration TOPIC_NOT_FOUND_EXPIRATION_INTERVAL = TDuration::Seco
static constexpr TDuration REQUEST_EXPIRATION_INTERVAL = TDuration::Seconds(30);
static constexpr TDuration WRITER_EXPIRATION_INTERVAL = TDuration::Minutes(5);
+NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC) {
+ return new TKafkaProduceActor(parent, clientDC);;
+}
+
TString TKafkaProduceActor::LogPrefix() {
TStringBuilder sb;
sb << "TKafkaProduceActor " << SelfId() << " State: ";
@@ -398,9 +402,9 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
}
auto* r = pendingRequest.Request->Get()->Request;
- auto cookie = pendingRequest.Request->Get()->Cookie;
+ auto correlationId = pendingRequest.Request->Get()->CorrelationId;
- KAFKA_LOG_D("Send result for cookie " << cookie << ". Expired=" << expired);
+ KAFKA_LOG_D("Send result for correlationId " << correlationId << ". Expired=" << expired);
const auto topicsCount = r->TopicData.size();
auto response = std::make_shared<TProduceResponseData>();
@@ -445,7 +449,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
}
}
- Send(Client, new TEvKafka::TEvResponse(cookie, response));
+ Send(Client, new TEvKafka::TEvResponse(correlationId, response));
if (!pendingRequest.WaitAcceptingCookies.empty()) {
if (!expired) {
diff --git a/ydb/core/kafka_proxy/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
index c618da5494..43c7a17056 100644
--- a/ydb/core/kafka_proxy/kafka_produce_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
@@ -5,7 +5,7 @@
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include "kafka_events.h"
+#include "../kafka_events.h"
namespace NKafka {
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index e486e457c7..cf19589853 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -5,8 +5,6 @@
#include "kafka_connection.h"
#include "kafka_events.h"
#include "kafka_messages.h"
-#include "kafka_produce_actor.h"
-#include "kafka_metadata_actor.h"
#include "kafka_log_impl.h"
#include <strstream>
@@ -18,6 +16,11 @@ namespace NKafka {
using namespace NActors;
using namespace NKikimr;
+NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message);
+NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message);
+NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message);
+NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC);
+
char Hex(const unsigned char c) {
return c < 10 ? '0' + c : 'A' + c - 10;
}
@@ -34,41 +37,21 @@ void Print(const TString& marker, TBuffer& buffer, ssize_t length) {
KAFKA_LOG_ERROR("Packet " << marker << ": " << sb);
}
-TApiVersionsResponseData GetApiVersions() {
- TApiVersionsResponseData response;
- response.ApiKeys.resize(4);
-
- response.ApiKeys[0].ApiKey = PRODUCE;
- response.ApiKeys[0].MinVersion = 3; // From version 3 record batch format is 2. Supported only 2 batch format.
- response.ApiKeys[0].MaxVersion = TProduceRequestData::MessageMeta::PresentVersions.Max;
-
- response.ApiKeys[1].ApiKey = API_VERSIONS;
- response.ApiKeys[1].MinVersion = TApiVersionsRequestData::MessageMeta::PresentVersions.Min;
- response.ApiKeys[1].MaxVersion = TApiVersionsRequestData::MessageMeta::PresentVersions.Max;
-
- response.ApiKeys[2].ApiKey = METADATA;
- response.ApiKeys[2].MinVersion = TMetadataRequestData::MessageMeta::PresentVersions.Min;
- response.ApiKeys[2].MaxVersion = TMetadataRequestData::MessageMeta::PresentVersions.Max;
-
- response.ApiKeys[3].ApiKey = INIT_PRODUCER_ID;
- response.ApiKeys[3].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min;
- response.ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max;
-
- return response;
-}
-
-static const TApiVersionsResponseData KAFKA_API_VERSIONS = GetApiVersions();
-
class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNetworkConfig {
public:
using TBase = TActorBootstrapped<TKafkaConnection>;
struct Msg {
+ using TPtr=std::shared_ptr<Msg>;
+
size_t Size = 0;
TKafkaInt32 ExpectedSize = 0;
TBuffer Buffer;
+
TRequestHeaderData Header;
- std::unique_ptr<TMessage> Message;
+ std::unique_ptr<TApiMessage> Message;
+
+ TApiMessage::TPtr Response;
};
static constexpr TDuration InactivityTimeout = TDuration::Minutes(10);
@@ -90,7 +73,10 @@ public:
NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier;
TString ClientDC;
- Msg Request;
+ i32 CorrelationId = 0;
+ std::shared_ptr<Msg> Request;
+ std::unordered_map<ui64, Msg::TPtr> PendingRequests;
+ std::deque<Msg::TPtr> PendingRequestsQueue;
enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, MESSAGE_READ, MESSAGE_PROCESS };
EReadSteps Step;
@@ -101,7 +87,6 @@ public:
TActorId ProduceActorId;
- std::unordered_map<ui64, Msg> PendingRequests;
TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config)
@@ -120,7 +105,7 @@ public:
Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false));
KAFKA_LOG_I("incoming connection opened " << Address);
- ProduceActorId = ctx.RegisterWithSameMailbox(new TKafkaProduceActor(SelfId(), ClientDC));
+ ProduceActorId = ctx.RegisterWithSameMailbox(CreateKafkaProduceActor(SelfId(), ClientDC));
OnAccept();
}
@@ -212,81 +197,92 @@ protected:
}
}
- void HandleMessage(TRequestHeaderData* header, TApiVersionsRequestData* /*message*/, size_t messageSize) {
- Reply(header, &KAFKA_API_VERSIONS);
-
- InflightSize -= messageSize;
+ void HandleMessage(TRequestHeaderData* header, const TApiVersionsRequestData* message) {
+ Register(CreateKafkaApiVersionsActor(SelfId(), header->CorrelationId, message));
}
- void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message, size_t /*messageSize*/) {
- PendingRequests[header->CorrelationId] = std::move(Request);
+ void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message) {
Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message));
}
- void Handle(TEvKafka::TEvResponse::TPtr response) {
- auto r = response->Get();
- Reply(r->Cookie, r->Response.get());
+ void HandleMessage(const TRequestHeaderData* header, const TInitProducerIdRequestData* message) {
+ Register(CreateKafkaInitProducerIdActor(SelfId(), header->CorrelationId, message));
}
- void HandleMessage(const TRequestHeaderData* header, const TInitProducerIdRequestData* /*message*/, size_t messageSize) {
- TInitProducerIdResponseData response;
- response.ProducerEpoch = 1;
- response.ProducerId = 1;
- response.ErrorCode = 0;
- response.ThrottleTimeMs = 0;
+ void HandleMessage(TRequestHeaderData* header, const TMetadataRequestData* message) {
+ Register(CreateKafkaMetadataActor(SelfId(), header->CorrelationId, message));
+ }
- Reply(header, &response);
+ void ProcessRequest() {
+ KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize
+ << ", Size=" << Request->Size);
- InflightSize -= messageSize;
- }
+ Msg::TPtr r = Request;
+ PendingRequestsQueue.push_back(r);
+ PendingRequests[r->Header.CorrelationId] = r;
- void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* message, size_t /*messageSize*/) {
- PendingRequests[header->CorrelationId] = std::move(Request);
- Register(new TKafkaMetadataActor(header->CorrelationId, message, SelfId()));
- }
+ TApiMessage* message = Request->Message.get();
- void ProcessRequest() {
- KAFKA_LOG_D("process message: ApiKey=" << Request.Header.RequestApiKey << ", ExpectedSize=" << Request.ExpectedSize
- << ", Size=" << Request.Size);
- switch (Request.Header.RequestApiKey) {
+ switch (Request->Header.RequestApiKey) {
case PRODUCE:
- HandleMessage(&Request.Header, dynamic_cast<TProduceRequestData*>(Request.Message.get()), Request.ExpectedSize);
+ HandleMessage(&Request->Header, dynamic_cast<TProduceRequestData*>(message));
return;
case API_VERSIONS:
- HandleMessage(&Request.Header, dynamic_cast<TApiVersionsRequestData*>(Request.Message.get()), Request.ExpectedSize);
+ HandleMessage(&Request->Header, dynamic_cast<TApiVersionsRequestData*>(message));
return;
case INIT_PRODUCER_ID:
- HandleMessage(&Request.Header, dynamic_cast<TInitProducerIdRequestData*>(Request.Message.get()), Request.ExpectedSize);
+ HandleMessage(&Request->Header, dynamic_cast<TInitProducerIdRequestData*>(message));
return;
case METADATA:
- HandleMessage(&Request.Header, dynamic_cast<TMetadataRequestData*>(Request.Message.get()), Request.ExpectedSize);
+ HandleMessage(&Request->Header, dynamic_cast<TMetadataRequestData*>(message));
return;
default:
- KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request.Header.RequestApiKey);
+ KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey);
+ PassAway();
}
}
- void Reply(const ui64 cookie, const TApiMessage* response) {
- auto it = PendingRequests.find(cookie);
+ void Handle(TEvKafka::TEvResponse::TPtr response) {
+ auto r = response->Get();
+ Reply(r->CorrelationId, r->Response);
+ }
+
+ void Reply(const ui64 correlationId, TApiMessage::TPtr response) {
+ auto it = PendingRequests.find(correlationId);
if (it == PendingRequests.end()) {
- KAFKA_LOG_ERROR("Unexpected cookie " << cookie);
+ KAFKA_LOG_ERROR("Unexpected correlationId " << correlationId);
return;
}
- auto& request = it->second;
- Reply(&request.Header, response);
-
- InflightSize -= request.ExpectedSize;
+ auto request = it->second;
+ request->Response = response;
+ request->Buffer.Clear();
- PendingRequests.erase(it);
+ ProcessReplyQueue();
DoRead();
}
+ void ProcessReplyQueue() {
+ while(!PendingRequestsQueue.empty()) {
+ auto& request = PendingRequestsQueue.front();
+ if (request->Response.get() == nullptr) {
+ break;
+ }
+
+ Reply(&request->Header, request->Response.get());
+
+ InflightSize -= request->ExpectedSize;
+
+ PendingRequests.erase(request->Header.CorrelationId);
+ PendingRequestsQueue.pop_front();
+ }
+ }
+
void Reply(const TRequestHeaderData* header, const TApiMessage* reply) {
TKafkaVersion headerVersion = ResponseHeaderVersion(header->RequestApiKey, header->RequestApiVersion);
TKafkaVersion version = header->RequestApiVersion;
@@ -328,69 +324,76 @@ protected:
}
received = res;
- Request.Size += received;
+ Request->Size += received;
Demand.Buffer += received;
Demand.Length -= received;
}
if (!Demand) {
switch (Step) {
case SIZE_READ:
- Demand = TReadDemand((char*)&(Request.ExpectedSize), sizeof(Request.ExpectedSize));
+ Request = std::make_unique<Msg>();
+ Demand = TReadDemand((char*)&(Request->ExpectedSize), sizeof(Request->ExpectedSize));
Step = SIZE_PREPARE;
break;
case SIZE_PREPARE:
- NormalizeNumber(Request.ExpectedSize);
- if ((ui64)Request.ExpectedSize > Config.GetMaxMessageSize()) {
- KAFKA_LOG_ERROR("message is big. Size: " << Request.ExpectedSize);
+ NormalizeNumber(Request->ExpectedSize);
+ if ((ui64)Request->ExpectedSize > Config.GetMaxMessageSize()) {
+ KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize);
return PassAway();
}
Step = INFLIGTH_CHECK;
case INFLIGTH_CHECK:
- if (InflightSize + Request.ExpectedSize > Config.GetMaxInflightSize()) {
+ if (InflightSize + Request->ExpectedSize > Config.GetMaxInflightSize()) {
return;
}
- InflightSize += Request.ExpectedSize;
+ InflightSize += Request->ExpectedSize;
Step = MESSAGE_READ;
case MESSAGE_READ:
- KAFKA_LOG_T("start read new message. ExpectedSize=" << Request.ExpectedSize);
+ KAFKA_LOG_T("start read new message. ExpectedSize=" << Request->ExpectedSize);
- Request.Buffer.Resize(Request.ExpectedSize);
- Demand = TReadDemand(Request.Buffer.Data(), Request.ExpectedSize);
+ Request->Buffer.Resize(Request->ExpectedSize);
+ Demand = TReadDemand(Request->Buffer.Data(), Request->ExpectedSize);
Step = MESSAGE_PROCESS;
break;
case MESSAGE_PROCESS:
- TKafkaInt16 apiKey = *(TKafkaInt16*)Request.Buffer.Data();
- TKafkaVersion apiVersion = *(TKafkaVersion*)(Request.Buffer.Data() + sizeof(TKafkaInt16));
+ TKafkaInt16 apiKey = *(TKafkaInt16*)Request->Buffer.Data();
+ TKafkaVersion apiVersion = *(TKafkaVersion*)(Request->Buffer.Data() + sizeof(TKafkaInt16));
NormalizeNumber(apiKey);
NormalizeNumber(apiVersion);
- KAFKA_LOG_D("received message. ApiKey=" << Request.Header.RequestApiKey
- << ", Version=" << Request.Header.RequestApiVersion);
+ KAFKA_LOG_D("received message. ApiKey=" << apiKey << ", Version=" << apiVersion);
- // Print("received", Request.Buffer, Request.ExpectedSize);
+ // Print("received", Request->Buffer, Request->ExpectedSize);
- TKafkaReadable readable(Request.Buffer);
+ TKafkaReadable readable(Request->Buffer);
- Request.Message = CreateRequest(apiKey);
+ Request->Message = CreateRequest(apiKey);
try {
- Request.Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion));
- Request.Message->Read(readable, apiVersion);
+ Request->Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion));
+ if (Request->Header.CorrelationId != CorrelationId) {
+ KAFKA_LOG_ERROR("Unexpected correlationId. Expected=" << CorrelationId << ", Received=" << Request->Header.CorrelationId);
+ return PassAway();
+ }
+ Request->Message->Read(readable, apiVersion);
+
+ ++CorrelationId;
} catch(const yexception& e) {
- KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request.Header.RequestApiKey
- << ", Version=" << Request.Header.RequestApiVersion
+ KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request->Header.RequestApiKey
+ << ", Version=" << Request->Header.RequestApiVersion
<< ", Error=" << e.what());
- return PassAway();
+ return PassAway();
}
+ Step = SIZE_READ;
+
ProcessRequest();
- Step = SIZE_READ;
break;
}
}
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index f674fe9804..cf9bcfadd6 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -25,22 +25,22 @@ struct TEvKafka {
struct TEvProduceRequest : public TEventLocal<TEvProduceRequest, EvProduceRequest> {
- TEvProduceRequest(const ui64 cookie, const TProduceRequestData* request)
- : Cookie(cookie)
+ TEvProduceRequest(const ui64 correlationId, const TProduceRequestData* request)
+ : CorrelationId(correlationId)
, Request(request)
{}
- ui64 Cookie;
+ ui64 CorrelationId;
const TProduceRequestData* Request;
};
struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> {
- TEvResponse(const ui64 cookie, const TApiMessage::TPtr response)
- : Cookie(cookie)
+ TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response)
+ : CorrelationId(correlationId)
, Response(std::move(response)) {
}
- ui64 Cookie;
+ const ui64 CorrelationId;
const TApiMessage::TPtr Response;
};
diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
index c7cbb25e88..cb8da65f67 100644
--- a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
+++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
@@ -1,7 +1,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
#include <ydb/core/kafka_proxy/kafka_events.h>
-#include <ydb/core/kafka_proxy/kafka_metadata_actor.h>
+#include <ydb/core/kafka_proxy/actors/kafka_metadata_actor.h>
namespace NKafka::NTests {
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) {
auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) {
auto* runtime = server.CleverServer->GetRuntime();
auto request = GetMetadataRequest(topics);
- auto actorId = runtime->Register(new TKafkaMetadataActor(1, request.Get(), edgeActor));
+ auto actorId = runtime->Register(new TKafkaMetadataActor(edgeActor, 1, request.Get()));
runtime->EnableScheduleForActor(actorId);
runtime->DispatchEvents();
Cerr << "Wait for response for topics: '";
diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make
index 326d8f9080..03fab884a5 100644
--- a/ydb/core/kafka_proxy/ya.make
+++ b/ydb/core/kafka_proxy/ya.make
@@ -1,6 +1,10 @@
LIBRARY()
SRCS(
+ actors/kafka_api_versions_actor.cpp
+ actors/kafka_init_producer_id_actor.cpp
+ actors/kafka_metadata_actor.cpp
+ actors/kafka_produce_actor.cpp
kafka_connection.cpp
kafka_connection.h
kafka_listener.h
@@ -11,9 +15,6 @@ SRCS(
kafka_messages.h
kafka_messages_int.cpp
kafka_messages_int.h
- kafka_produce_actor.cpp
- kafka_metadata_actor.cpp
- kafka_produce_actor.h
kafka_proxy.h
kafka_records.cpp
)