aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-30 09:54:58 +0300
committertesseract <tesseract@yandex-team.com>2023-08-30 10:25:15 +0300
commitd6dfde60c0bc97c0acffa53994186d90c6144fa9 (patch)
tree39b3d0c4ad1c4a88e66647eaa2c64ee710785e62
parenta8938c851e738f63eb082d0d51cf0beb994fca3c (diff)
downloadydb-d6dfde60c0bc97c0acffa53994186d90c6144fa9.tar.gz
Accurate checking of incoming parameters for Kafka protocol
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp47
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.h8
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp103
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp377
4 files changed, 350 insertions, 185 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index 11b3cd582f..5fd4d2b2da 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -215,7 +215,7 @@ void TKafkaProduceActor::ProcessRequests(const TActorContext& ctx) {
}
if (EnqueueInitialization()) {
- PendingRequests.push_back({Requests.front()});
+ PendingRequests.push_back(std::make_shared<TPendingRequest>(Requests.front()));
Requests.pop_front();
ProcessRequest(PendingRequests.back(), ctx);
@@ -316,11 +316,11 @@ size_t PartsCount(const TProduceRequestData* r) {
return result;
}
-void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const TActorContext& ctx) {
- auto* r = pendingRequest.Request->Get()->Request;
+void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, const TActorContext& ctx) {
+ auto* r = pendingRequest->Request->Get()->Request;
- pendingRequest.Results.resize(PartsCount(r));
- pendingRequest.StartTime = ctx.Now();
+ pendingRequest->Results.resize(PartsCount(r));
+ pendingRequest->StartTime = ctx.Now();
size_t position = 0;
for(const auto& topicData : r->TopicData) {
@@ -335,16 +335,16 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T
cookieInfo.TopicPath = topicPath;
cookieInfo.PartitionId = partitionId;
cookieInfo.Position = position;
- cookieInfo.Request = &pendingRequest;
+ cookieInfo.Request = pendingRequest;
- pendingRequest.WaitAcceptingCookies.insert(ownCookie);
- pendingRequest.WaitResultCookies.insert(ownCookie);
+ pendingRequest->WaitAcceptingCookies.insert(ownCookie);
+ pendingRequest->WaitResultCookies.insert(ownCookie);
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC);
Send(writer.second, std::move(ev));
} else {
- auto& result = pendingRequest.Results[position];
+ auto& result = pendingRequest->Results[position];
switch (writer.first) {
case NOT_FOUND:
result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
@@ -359,7 +359,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T
}
}
- if (pendingRequest.WaitResultCookies.empty()) {
+ if (pendingRequest->WaitResultCookies.empty()) {
// All request for unknown topic or empty request
SendResults(ctx);
} else {
@@ -407,8 +407,6 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque
partitionResult.Value = request;
cookieInfo.Request->WaitResultCookies.erase(cookie);
- Cookies.erase(it);
-
if (!r->IsSuccess()) {
auto wit = Writers.find(cookieInfo.TopicPath);
if (wit != Writers.end()) {
@@ -424,6 +422,8 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque
if (cookieInfo.Request->WaitResultCookies.empty()) {
SendResults(ctx);
}
+
+ Cookies.erase(it);
}
EKafkaErrors Convert(TEvPartitionWriter::TEvWriteResponse::EErrors value) {
@@ -443,17 +443,17 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
// We send the results in the order of receipt of the request
while (!PendingRequests.empty()) {
- auto& pendingRequest = PendingRequests.front();
+ auto pendingRequest = PendingRequests.front();
// We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died.
- bool expired = expireTime > pendingRequest.StartTime;
+ bool expired = expireTime > pendingRequest->StartTime;
- if (!expired && !pendingRequest.WaitResultCookies.empty()) {
+ if (!expired && !pendingRequest->WaitResultCookies.empty()) {
return;
}
- auto* r = pendingRequest.Request->Get()->Request;
- auto correlationId = pendingRequest.Request->Get()->CorrelationId;
+ auto* r = pendingRequest->Request->Get()->Request;
+ auto correlationId = pendingRequest->Request->Get()->CorrelationId;
KAFKA_LOG_D("Send result for correlationId " << correlationId << ". Expired=" << expired);
@@ -474,7 +474,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
for(size_t j = 0; j < partitionCount; ++j) {
const auto& partitionData = topicData.PartitionData[j];
auto& partitionResponse = topicResponse.PartitionResponses[j];
- const auto& result = pendingRequest.Results[position++];
+ const auto& result = pendingRequest->Results[position++];
partitionResponse.Index = partitionData.Index;
@@ -505,11 +505,11 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
Send(Context->ConnectionId, new TEvKafka::TEvResponse(correlationId, response));
- if (!pendingRequest.WaitAcceptingCookies.empty()) {
+ if (!pendingRequest->WaitAcceptingCookies.empty()) {
if (!expired) {
TStringBuilder sb;
sb << "All TEvWriteResponse were received, but not all TEvWriteAccepted. Unreceived cookies:";
- for(auto cookie : pendingRequest.WaitAcceptingCookies) {
+ for(auto cookie : pendingRequest->WaitAcceptingCookies) {
sb << " " << cookie;
}
KAFKA_LOG_W(sb);
@@ -519,6 +519,13 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
}
}
+ for(auto cookie : pendingRequest->WaitAcceptingCookies) {
+ Cookies.erase(cookie);
+ }
+ for(auto cookie : pendingRequest->WaitResultCookies) {
+ Cookies.erase(cookie);
+ }
+
PendingRequests.pop_front();
}
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
index b37c689096..70f4d09ab9 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
@@ -118,7 +118,7 @@ private:
// Logic
void ProcessRequests(const TActorContext& ctx);
- void ProcessRequest(TPendingRequest& pendingRequest, const TActorContext& ctx);
+ void ProcessRequest(std::shared_ptr<TPendingRequest> pendingRequest, const TActorContext& ctx);
void SendResults(const TActorContext& ctx);
@@ -142,6 +142,8 @@ private:
TDeque<TEvKafka::TEvProduceRequest::TPtr> Requests;
struct TPendingRequest {
+ using TPtr = std::shared_ptr<TPendingRequest>;
+
TPendingRequest(TEvKafka::TEvProduceRequest::TPtr request)
: Request(request) {
}
@@ -160,14 +162,14 @@ private:
TInstant StartTime;
};
- TDeque<TPendingRequest> PendingRequests;
+ TDeque<TPendingRequest::TPtr> PendingRequests;
struct TCookieInfo {
TString TopicPath;
ui32 PartitionId;
size_t Position;
- TPendingRequest* Request;
+ TPendingRequest::TPtr Request;
};
std::map<ui64, TCookieInfo> Cookies;
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 696d079981..a783cb00d7 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -16,22 +16,9 @@ namespace NKafka {
using namespace NActors;
using namespace NKikimr;
-
-char Hex(const unsigned char c) {
- return c < 10 ? '0' + c : 'A' + c - 10;
-}
-
-void Print(const TString& marker, TBuffer& buffer, ssize_t length) {
- TStringBuilder sb;
- for (ssize_t i = 0; i < length; ++i) {
- char c = buffer.Data()[i];
- if (i > 0) {
- sb << ", ";
- }
- sb << "0x" << Hex((c & 0xF0) >> 4) << Hex(c & 0x0F);
- }
- KAFKA_LOG_ERROR("Packet " << marker << ": " << sb);
-}
+static constexpr size_t HeaderSize = sizeof(TKafkaInt16) /* apiKey */ +
+ sizeof(TKafkaVersion) /* version */ +
+ sizeof(TKafkaInt32) /* correlationId */;
class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNetworkConfig {
public:
@@ -44,6 +31,10 @@ public:
TKafkaInt32 ExpectedSize = 0;
TBuffer Buffer;
+ TKafkaInt16 ApiKey;
+ TKafkaVersion ApiVersion;
+ TKafkaInt32 CorrelationId;
+
TRequestHeaderData Header;
std::unique_ptr<TApiMessage> Message;
@@ -72,7 +63,7 @@ public:
std::unordered_map<ui64, Msg::TPtr> PendingRequests;
std::deque<Msg::TPtr> PendingRequestsQueue;
- enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, MESSAGE_READ, MESSAGE_PROCESS };
+ enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, HEADER_READ, HEADER_PROCESS, MESSAGE_READ, MESSAGE_PROCESS };
EReadSteps Step;
TReadDemand Demand;
@@ -230,11 +221,6 @@ protected:
Msg::TPtr r = Request;
- if (!Context->Authenticated() && RequireAuthentication(static_cast<EApiKey>(Request->Header.RequestApiKey))) {
- KAFKA_LOG_ERROR("unauthenticated request: ApiKey=" << Request->Header.RequestApiKey);
- return PassAway();
- }
-
PendingRequestsQueue.push_back(r);
PendingRequests[r->Header.CorrelationId] = r;
@@ -405,10 +391,20 @@ protected:
case SIZE_PREPARE:
NormalizeNumber(Request->ExpectedSize);
+ if (Request->ExpectedSize < 0) {
+ KAFKA_LOG_ERROR("Wrong message size. Size: " << Request->ExpectedSize);
+ return PassAway();
+ }
if ((ui64)Request->ExpectedSize > Context->Config.GetMaxMessageSize()) {
- KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize << ". MaxSize: " << Context->Config.GetMaxMessageSize());
+ KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize << ". MaxSize: "
+ << Context->Config.GetMaxMessageSize());
return PassAway();
}
+ if (static_cast<size_t>(Request->ExpectedSize) < HeaderSize) {
+ KAFKA_LOG_ERROR("message is small. Size: " << Request->ExpectedSize);
+ return PassAway();
+ }
+
Step = INFLIGTH_CHECK;
case INFLIGTH_CHECK:
@@ -423,43 +419,58 @@ protected:
InflightSize += Request->ExpectedSize;
Step = MESSAGE_READ;
+ case HEADER_READ:
+ KAFKA_LOG_T("start read header. ExpectedSize=" << Request->ExpectedSize);
+
+ Request->Buffer.Resize(HeaderSize);
+ Demand = TReadDemand(Request->Buffer.Data(), HeaderSize);
+
+ Step = HEADER_PROCESS;
+ break;
+
+ case HEADER_PROCESS:
+ Request->ApiKey = *(TKafkaInt16*)Request->Buffer.Data();
+ Request->ApiVersion = *(TKafkaVersion*)(Request->Buffer.Data() + sizeof(TKafkaInt16));
+ Request->CorrelationId = *(TKafkaInt32*)(Request->Buffer.Data() + sizeof(TKafkaInt16) + sizeof(TKafkaVersion));
+
+ NormalizeNumber(Request->ApiKey);
+ NormalizeNumber(Request->ApiVersion);
+ NormalizeNumber(Request->CorrelationId);
+
+ if (PendingRequests.contains(Request->CorrelationId)) {
+ KAFKA_LOG_ERROR("CorrelationId " << Request->CorrelationId << " already processing");
+ return PassAway();
+ }
+ if (!Context->Authenticated() && RequireAuthentication(static_cast<EApiKey>(Request->ApiKey))) {
+ KAFKA_LOG_ERROR("unauthenticated request: ApiKey=" << Request->ApiKey);
+ return PassAway();
+ }
+
+ Step = MESSAGE_READ;
+
case MESSAGE_READ:
KAFKA_LOG_T("start read new message. ExpectedSize=" << Request->ExpectedSize);
Request->Buffer.Resize(Request->ExpectedSize);
- Demand = TReadDemand(Request->Buffer.Data(), Request->ExpectedSize);
+ Demand = TReadDemand(Request->Buffer.Data() + HeaderSize, Request->ExpectedSize - HeaderSize);
Step = MESSAGE_PROCESS;
break;
case MESSAGE_PROCESS:
- TKafkaInt16 apiKey = *(TKafkaInt16*)Request->Buffer.Data();
- TKafkaVersion apiVersion = *(TKafkaVersion*)(Request->Buffer.Data() + sizeof(TKafkaInt16));
- TKafkaInt32 correlationId = *(TKafkaInt32*)(Request->Buffer.Data() + sizeof(TKafkaInt16) + sizeof(TKafkaInt16));
-
- NormalizeNumber(apiKey);
- NormalizeNumber(apiVersion);
- NormalizeNumber(correlationId);
-
- KAFKA_LOG_D("received message. ApiKey=" << apiKey << ", Version=" << apiVersion << ", CorrelationId=" << correlationId);
-
- if (PendingRequests.contains(correlationId)) {
- KAFKA_LOG_ERROR("CorrelationId " << correlationId << " already processing");
- return PassAway();
- }
-
- // Print("received", Request->Buffer, Request->ExpectedSize);
+ KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId);
TKafkaReadable readable(Request->Buffer);
- Request->Message = CreateRequest(apiKey);
try {
- Request->Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion));
- Request->Message->Read(readable, apiVersion);
+ Request->Message = CreateRequest(Request->ApiKey);
+
+ Request->Header.Read(readable, RequestHeaderVersion(Request->ApiKey, Request->ApiVersion));
+ Request->Message->Read(readable, Request->ApiVersion);
} catch(const yexception& e) {
- KAFKA_LOG_ERROR("error on processing message: ApiKey=" << apiKey
- << ", Version=" << apiVersion
- << ", CorrelationId=" << correlationId
+ KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request->ApiKey
+ << ", Version=" << Request->ApiVersion
+ << ", CorrelationId=" << Request->CorrelationId
<< ", Error=" << e.what());
return PassAway();
}
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index ac633aa3cc..280bc1cb28 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -20,7 +20,6 @@
#include <random>
-
using namespace NKafka;
using namespace NYdb;
using namespace NYdb::NTable;
@@ -32,7 +31,7 @@ static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin";
static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud";
static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder";
-struct WithSslAndAuth : TKikimrTestSettings {
+struct WithSslAndAuth: TKikimrTestSettings {
static constexpr bool SSL = true;
static constexpr bool AUTH = true;
};
@@ -54,8 +53,7 @@ void Print(const TBuffer& buffer) {
Cerr << ">>>>> Packet sent: " << sb << Endl;
}
-
-template<class TKikimr, bool secure>
+template <class TKikimr, bool secure>
class TTestServer {
public:
TIpPort Port;
@@ -83,7 +81,6 @@ public:
appConfig.MutableKafkaProxyConfig()->SetMaxMessageSize(1024);
appConfig.MutableKafkaProxyConfig()->SetMaxInflightSize(2048);
-
appConfig.MutablePQConfig()->MutableQuotingConfig()->SetEnableQuoting(true);
appConfig.MutablePQConfig()->MutableQuotingConfig()->SetQuotaWaitDurationMs(300);
appConfig.MutablePQConfig()->MutableQuotingConfig()->SetPartitionReadQuotaIsTwiceWriteQuota(true);
@@ -116,15 +113,7 @@ public:
if (secure) {
appConfig.MutablePQConfig()->SetRequireCredentialsInNewProtocol(true);
}
- KikimrServer = std::unique_ptr<TKikimr>(new TKikimr(
- std::move(appConfig),
- {},
- {},
- false,
- nullptr,
- nullptr,
- 0)
- );
+ KikimrServer = std::unique_ptr<TKikimr>(new TKikimr(std::move(appConfig), {}, {}, false, nullptr, nullptr, 0));
KikimrServer->GetRuntime()->SetLogPriority(NKikimrServices::KAFKA_PROXY, NActors::NLog::PRI_TRACE);
ui16 grpc = KikimrServer->GetPort();
@@ -142,21 +131,38 @@ public:
NYdb::NScheme::TSchemeClient schemeClient(*Driver);
NYdb::NScheme::TPermissions permissions("user@builtin", {"ydb.generic.read", "ydb.generic.write"});
- auto result = schemeClient.ModifyPermissions("/Root",
- NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions)
- ).ExtractValueSync();
+ auto result = schemeClient
+ .ModifyPermissions(
+ "/Root", NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions))
+ .ExtractValueSync();
Cerr << result.GetIssues().ToString() << "\n";
UNIT_ASSERT(result.IsSuccess());
}
TClient client(*(KikimrServer->ServerSettings));
- UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
- client.AlterUserAttributes("/", "Root", {{"folder_id", DEFAULT_FOLDER_ID},
- {"cloud_id", DEFAULT_CLOUD_ID},
- {"database_id", "root"}}));
+ UNIT_ASSERT_VALUES_EQUAL(
+ NMsgBusProxy::MSTATUS_OK,
+ client.AlterUserAttributes("/", "Root",
+ {{"folder_id", DEFAULT_FOLDER_ID},
+ {"cloud_id", DEFAULT_CLOUD_ID},
+ {"database_id", "root"},
+ {"serverless_rt_coordination_node_path", "/Coordinator/Root"},
+ {"serverless_rt_base_resource_ru", "/ru_Root"}}));
- auto status = client.CreateUser("/Root", "ouruser", "ourUserPassword");
- UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::MSTATUS_OK);
+ {
+ auto status = client.CreateUser("/Root", "ouruser", "ourUserPassword");
+ UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::MSTATUS_OK);
+
+ NYdb::NScheme::TSchemeClient schemeClient(*Driver);
+ NYdb::NScheme::TPermissions permissions("ouruser", {"ydb.generic.read", "ydb.generic.write"});
+
+ auto result = schemeClient
+ .ModifyPermissions(
+ "/Root", NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions))
+ .ExtractValueSync();
+ Cerr << result.GetIssues().ToString() << "\n";
+ UNIT_ASSERT(result.IsSuccess());
+ }
}
public:
@@ -217,141 +223,280 @@ std::unique_ptr<TApiMessage> Read(TSocketInput& si, TRequestHeaderData* requestH
return response;
}
+void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& msg, const TString& field,
+ const TString& expectedValue) {
+ if (msg.GetMessageMeta()) {
+ for (auto& [k, v] : msg.GetMessageMeta()->Fields) {
+ Cerr << ">>>>> key=" << k << ", value=" << v << Endl;
+ if (field == k) {
+ UNIT_ASSERT_STRINGS_EQUAL(v, expectedValue);
+ return;
+ }
+ }
+ }
+ UNIT_ASSERT_C(false, "Field " << field << " not found in message meta");
+}
+
+TMaybe<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<NYdb::NTopic::IReadSession> reader) {
+ TMaybe<NTopic::TReadSessionEvent::TDataReceivedEvent> result;
+ while (true) {
+ auto event = reader->GetEvent(true);
+ if (!event)
+ return result;
+ if (auto dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
+ result = *dataEvent;
+ break;
+ } else if (auto* lockEv = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
+ lockEv->Confirm();
+ } else if (auto* releaseEv = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
+ releaseEv->Confirm();
+ } else if (auto* closeSessionEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) {
+ return result;
+ }
+ }
+ return result;
+}
+
+class TTestClient {
+public:
+ TTestClient(ui16 port, const TString clientName = "TestClient")
+ : Addr("localhost", port)
+ , Socket(Addr)
+ , So(Socket)
+ , Si(Socket)
+ , Correlation(0)
+ , ClientName(clientName) {
+ }
+
+ TApiVersionsResponseData::TPtr ApiVersions() {
+ Cerr << ">>>>> ApiVersionsRequest\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::API_VERSIONS, 2);
+
+ TApiVersionsRequestData request;
+ request.ClientSoftwareName = "SuperTest";
+ request.ClientSoftwareVersion = "3100.7.13";
+
+ return WriteAndRead<TApiVersionsResponseData>(header, request);
+ }
+
+ TSaslHandshakeResponseData::TPtr SaslHandshake(const TString& mechanism = "PLAIN") {
+ Cerr << ">>>>> SaslHandshakeRequest\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::SASL_HANDSHAKE, 1);
+
+ TSaslHandshakeRequestData request;
+ request.Mechanism = mechanism;
+
+ return WriteAndRead<TSaslHandshakeResponseData>(header, request);
+ }
+
+ TSaslAuthenticateResponseData::TPtr SaslAuthenticate(const TString& user, const TString& password) {
+ Cerr << ">>>>> SaslAuthenticateRequestData\n";
+
+ TStringBuilder authBytes;
+ authBytes << "ignored" << '\0' << user << '\0' << password;
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::SASL_AUTHENTICATE, 2);
+
+ TSaslAuthenticateRequestData request;
+ request.AuthBytes = TKafkaRawBytes(authBytes.data(), authBytes.size());
+
+ return WriteAndRead<TSaslAuthenticateResponseData>(header, request);
+ }
+
+ TInitProducerIdResponseData::TPtr InitProducerId() {
+ Cerr << ">>>>> TInitProducerIdRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::INIT_PRODUCER_ID, 4);
+
+ TInitProducerIdRequestData request;
+ request.TransactionTimeoutMs = 5000;
+
+ return WriteAndRead<TInitProducerIdResponseData>(header, request);
+ }
+
+ TProduceResponseData::TPtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) {
+ Cerr << ">>>>> TProduceRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9);
+
+ TProduceRequestData request;
+ request.TopicData.resize(1);
+ request.TopicData[0].Name = topicName;
+ request.TopicData[0].PartitionData.resize(1);
+ request.TopicData[0].PartitionData[0].Index = partition;
+ request.TopicData[0].PartitionData[0].Records = batch;
+
+ return WriteAndRead<TProduceResponseData>(header, request);
+ }
+
+ void UnknownApiKey() {
+ Cerr << ">>>>> Unknown apiKey\n";
+
+ TRequestHeaderData header;
+ header.RequestApiKey = 7654;
+ header.RequestApiVersion = 1;
+ header.CorrelationId = NextCorrelation();
+ header.ClientId = ClientName;
+
+ TApiVersionsRequestData request;
+ request.ClientSoftwareName = "SuperTest";
+ request.ClientSoftwareVersion = "3100.7.13";
+
+ Write(So, &header, &request);
+ }
+
+protected:
+ ui32 NextCorrelation() {
+ return Correlation++;
+ }
+
+ template <class T>
+ typename T::TPtr WriteAndRead(TRequestHeaderData& header, TApiMessage& request) {
+ Write(So, &header, &request);
+
+ auto response = Read(Si, &header);
+ auto* msg = dynamic_cast<T*>(response.release());
+ return std::shared_ptr<T>(msg);
+ }
+
+ TRequestHeaderData Header(NKafka::EApiKey apiKey, TKafkaVersion version) {
+ TRequestHeaderData header;
+ header.RequestApiKey = apiKey;
+ header.RequestApiVersion = version;
+ header.CorrelationId = NextCorrelation();
+ header.ClientId = ClientName;
+ return header;
+ }
+
+private:
+ TNetworkAddress Addr;
+ TSocket Socket;
+ TSocketOutput So;
+ TSocketInput Si;
+
+ ui32 Correlation;
+ TString ClientName;
+};
+
Y_UNIT_TEST_SUITE(KafkaProtocol) {
Y_UNIT_TEST(ProduceScenario) {
TInsecureTestServer testServer;
TString topicName = "/Root/topic-0-test";
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
{
- NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
- auto result = pqClient.CreateTopic(topicName).ExtractValueSync();
+ auto result =
+ pqClient
+ .CreateTopic(topicName,
+ NYdb::NTopic::TCreateTopicSettings().BeginAddConsumer("consumer-0").EndAddConsumer())
+ .ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}
- TNetworkAddress addr("localhost", testServer.Port);
- TSocket s(addr);
- TSocketOutput so(s);
- TSocketInput si(s);
+ auto settings = NTopic::TReadSessionSettings()
+ .AppendTopics(NTopic::TTopicReadSettings(topicName))
+ .ConsumerName("consumer-0");
+ auto topicReader = pqClient.CreateReadSession(settings);
- size_t correlationId = 0;
+ TTestClient client(testServer.Port);
{
- Cerr << ">>>>> ApiVersionsRequest\n";
-
- TRequestHeaderData header;
- header.RequestApiKey = NKafka::EApiKey::API_VERSIONS;
- header.RequestApiVersion = 2;
- header.CorrelationId = correlationId++;
- header.ClientId = "test";
-
- TApiVersionsRequestData request;
- request.ClientSoftwareName = "SuperTest";
- request.ClientSoftwareVersion = "3100.7.13";
-
- Write(so, &header, &request);
-
- auto response = Read(si, &header);
- auto* msg = dynamic_cast<TApiVersionsResponseData*>(response.get());
+ auto msg = client.ApiVersions();
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 6u);
}
{
- Cerr << ">>>>> SaslHandshakeRequest\n";
+ auto msg = client.SaslHandshake();
- TRequestHeaderData header;
- header.RequestApiKey = NKafka::EApiKey::SASL_HANDSHAKE;
- header.RequestApiVersion = 1;
- header.CorrelationId = correlationId++;
- header.ClientId = "test";
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
+ }
- TSaslHandshakeRequestData request;
- request.Mechanism = "PLAIN";
+ {
+ auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
- Write(so, &header, &request);
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
- auto response = Read(si, &header);
- auto* msg = dynamic_cast<TSaslHandshakeResponseData*>(response.get());
+ {
+ auto msg = client.InitProducerId();
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
}
{
- Cerr << ">>>>> SaslAuthenticateRequestData\n";
- char authBytes[] = "ignored\0ouruser@/Root\0ourUserPassword";
+ TString key = "record-key";
+ TString value = "record-value";
+ TString headerKey = "header-key";
+ TString headerValue = "header-value";
+
+ TKafkaRecordBatch batch;
+ batch.BaseOffset = 3;
+ batch.BaseSequence = 5;
+ batch.Magic = 2; // Current supported
+ batch.Records.resize(1);
+ batch.Records[0].Key = TKafkaRawBytes(key.Data(), key.Size());
+ batch.Records[0].Value = TKafkaRawBytes(value.Data(), value.Size());
+ batch.Records[0].Headers.resize(1);
+ batch.Records[0].Headers[0].Key = TKafkaRawBytes(headerKey.Data(), headerKey.Size());
+ batch.Records[0].Headers[0].Value = TKafkaRawBytes(headerValue.Data(), headerValue.Size());
+
+ auto msg = client.Produce(topicName, 0, batch);
- TRequestHeaderData header;
- header.RequestApiKey = NKafka::EApiKey::SASL_AUTHENTICATE;
- header.RequestApiVersion = 2;
- header.CorrelationId = correlationId++;
- header.ClientId = "test";
-
- TSaslAuthenticateRequestData request;
- request.AuthBytes = TKafkaRawBytes(authBytes, sizeof(authBytes) - 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
+ static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- Write(so, &header, &request);
+ auto m = Read(topicReader);
+ UNIT_ASSERT(m);
- auto response = Read(si, &header);
- auto* msg = dynamic_cast<TSaslAuthenticateResponseData*>(response.get());
+ UNIT_ASSERT_EQUAL(m->GetMessages().size(), 1);
+ auto& m0 = m->GetMessages()[0];
+ m0.Commit();
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_STRINGS_EQUAL(m0.GetData(), value);
+ AssertMessageMeta(m0, "__key", key);
+ AssertMessageMeta(m0, headerKey, headerValue);
}
{
- Cerr << ">>>>> TInitProducerIdRequestData\n";
+ // Check short topic name
- TRequestHeaderData header;
- header.RequestApiKey = NKafka::EApiKey::INIT_PRODUCER_ID;
- header.RequestApiVersion = 4;
- header.CorrelationId = correlationId++;
- header.ClientId = "test";
+ TKafkaRecordBatch batch;
+ batch.BaseOffset = 7;
+ batch.BaseSequence = 11;
+ batch.Magic = 2; // Current supported
+ batch.Records.resize(1);
+ batch.Records[0].Key = "record-key-1";
+ batch.Records[0].Value = "record-value-1";
- TInitProducerIdRequestData request;
- request.TransactionTimeoutMs = 5000;
+ auto msg = client.Produce("topic-0-test", 0, batch);
- Write(so, &header, &request);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, "topic-0-test");
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
+ static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- auto response = Read(si, &header);
- auto* msg = dynamic_cast<TInitProducerIdResponseData*>(response.get());
+ auto m = Read(topicReader);
+ UNIT_ASSERT(m);
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_EQUAL(m->GetMessages().size(), 1);
+ auto& m0 = m->GetMessages()[0];
+ m0.Commit();
}
{
- Cerr << ">>>>> TProduceRequestData\n";
-
- TRequestHeaderData header;
- header.RequestApiKey = NKafka::EApiKey::PRODUCE;
- header.RequestApiVersion = 9;
- header.CorrelationId = correlationId++;
- header.ClientId = "test-client-random-string";
-
- TProduceRequestData request;
- request.TopicData.resize(1);
- request.TopicData[0].Name = topicName;
- request.TopicData[0].PartitionData.resize(1);
- request.TopicData[0].PartitionData[0].Index = 0; // Partition id
- request.TopicData[0].PartitionData[0].Records.emplace();
- request.TopicData[0].PartitionData[0].Records->BaseOffset = 3;
- request.TopicData[0].PartitionData[0].Records->BaseSequence = 5;
- request.TopicData[0].PartitionData[0].Records->Magic = 2; // Current supported
- request.TopicData[0].PartitionData[0].Records->Records.resize(1);
- request.TopicData[0].PartitionData[0].Records->Records[0].Key = "record-key";
- request.TopicData[0].PartitionData[0].Records->Records[0].Value = "record-value";
-
- Write(so, &header, &request);
-
- auto response = Read(si, &header);
- auto* msg = dynamic_cast<TProduceResponseData*>(response.get());
-
- UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName);
- UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0);
- // UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ // Check unknown ApiKey (must be last. close the session)
+ // expect no exception
+ client.UnknownApiKey();
}
- }
-} \ No newline at end of file
+ } // Y_UNIT_TEST(ProduceScenario)
+} // Y_UNIT_TEST_SUITE(KafkaProtocol)