diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-30 09:54:58 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-30 10:25:15 +0300 |
commit | d6dfde60c0bc97c0acffa53994186d90c6144fa9 (patch) | |
tree | 39b3d0c4ad1c4a88e66647eaa2c64ee710785e62 | |
parent | a8938c851e738f63eb082d0d51cf0beb994fca3c (diff) | |
download | ydb-d6dfde60c0bc97c0acffa53994186d90c6144fa9.tar.gz |
Accurate checking of incoming parameters for Kafka protocol
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp | 47 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_produce_actor.h | 8 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 103 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 377 |
4 files changed, 350 insertions, 185 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 11b3cd582f..5fd4d2b2da 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -215,7 +215,7 @@ void TKafkaProduceActor::ProcessRequests(const TActorContext& ctx) { } if (EnqueueInitialization()) { - PendingRequests.push_back({Requests.front()}); + PendingRequests.push_back(std::make_shared<TPendingRequest>(Requests.front())); Requests.pop_front(); ProcessRequest(PendingRequests.back(), ctx); @@ -316,11 +316,11 @@ size_t PartsCount(const TProduceRequestData* r) { return result; } -void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const TActorContext& ctx) { - auto* r = pendingRequest.Request->Get()->Request; +void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, const TActorContext& ctx) { + auto* r = pendingRequest->Request->Get()->Request; - pendingRequest.Results.resize(PartsCount(r)); - pendingRequest.StartTime = ctx.Now(); + pendingRequest->Results.resize(PartsCount(r)); + pendingRequest->StartTime = ctx.Now(); size_t position = 0; for(const auto& topicData : r->TopicData) { @@ -335,16 +335,16 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T cookieInfo.TopicPath = topicPath; cookieInfo.PartitionId = partitionId; cookieInfo.Position = position; - cookieInfo.Request = &pendingRequest; + cookieInfo.Request = pendingRequest; - pendingRequest.WaitAcceptingCookies.insert(ownCookie); - pendingRequest.WaitResultCookies.insert(ownCookie); + pendingRequest->WaitAcceptingCookies.insert(ownCookie); + pendingRequest->WaitResultCookies.insert(ownCookie); auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC); Send(writer.second, std::move(ev)); } else { - auto& result = pendingRequest.Results[position]; + auto& result = pendingRequest->Results[position]; switch (writer.first) { case NOT_FOUND: result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; @@ -359,7 +359,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T } } - if (pendingRequest.WaitResultCookies.empty()) { + if (pendingRequest->WaitResultCookies.empty()) { // All request for unknown topic or empty request SendResults(ctx); } else { @@ -407,8 +407,6 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque partitionResult.Value = request; cookieInfo.Request->WaitResultCookies.erase(cookie); - Cookies.erase(it); - if (!r->IsSuccess()) { auto wit = Writers.find(cookieInfo.TopicPath); if (wit != Writers.end()) { @@ -424,6 +422,8 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque if (cookieInfo.Request->WaitResultCookies.empty()) { SendResults(ctx); } + + Cookies.erase(it); } EKafkaErrors Convert(TEvPartitionWriter::TEvWriteResponse::EErrors value) { @@ -443,17 +443,17 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { // We send the results in the order of receipt of the request while (!PendingRequests.empty()) { - auto& pendingRequest = PendingRequests.front(); + auto pendingRequest = PendingRequests.front(); // We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died. - bool expired = expireTime > pendingRequest.StartTime; + bool expired = expireTime > pendingRequest->StartTime; - if (!expired && !pendingRequest.WaitResultCookies.empty()) { + if (!expired && !pendingRequest->WaitResultCookies.empty()) { return; } - auto* r = pendingRequest.Request->Get()->Request; - auto correlationId = pendingRequest.Request->Get()->CorrelationId; + auto* r = pendingRequest->Request->Get()->Request; + auto correlationId = pendingRequest->Request->Get()->CorrelationId; KAFKA_LOG_D("Send result for correlationId " << correlationId << ". Expired=" << expired); @@ -474,7 +474,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { for(size_t j = 0; j < partitionCount; ++j) { const auto& partitionData = topicData.PartitionData[j]; auto& partitionResponse = topicResponse.PartitionResponses[j]; - const auto& result = pendingRequest.Results[position++]; + const auto& result = pendingRequest->Results[position++]; partitionResponse.Index = partitionData.Index; @@ -505,11 +505,11 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { Send(Context->ConnectionId, new TEvKafka::TEvResponse(correlationId, response)); - if (!pendingRequest.WaitAcceptingCookies.empty()) { + if (!pendingRequest->WaitAcceptingCookies.empty()) { if (!expired) { TStringBuilder sb; sb << "All TEvWriteResponse were received, but not all TEvWriteAccepted. Unreceived cookies:"; - for(auto cookie : pendingRequest.WaitAcceptingCookies) { + for(auto cookie : pendingRequest->WaitAcceptingCookies) { sb << " " << cookie; } KAFKA_LOG_W(sb); @@ -519,6 +519,13 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { } } + for(auto cookie : pendingRequest->WaitAcceptingCookies) { + Cookies.erase(cookie); + } + for(auto cookie : pendingRequest->WaitResultCookies) { + Cookies.erase(cookie); + } + PendingRequests.pop_front(); } } diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index b37c689096..70f4d09ab9 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -118,7 +118,7 @@ private: // Logic void ProcessRequests(const TActorContext& ctx); - void ProcessRequest(TPendingRequest& pendingRequest, const TActorContext& ctx); + void ProcessRequest(std::shared_ptr<TPendingRequest> pendingRequest, const TActorContext& ctx); void SendResults(const TActorContext& ctx); @@ -142,6 +142,8 @@ private: TDeque<TEvKafka::TEvProduceRequest::TPtr> Requests; struct TPendingRequest { + using TPtr = std::shared_ptr<TPendingRequest>; + TPendingRequest(TEvKafka::TEvProduceRequest::TPtr request) : Request(request) { } @@ -160,14 +162,14 @@ private: TInstant StartTime; }; - TDeque<TPendingRequest> PendingRequests; + TDeque<TPendingRequest::TPtr> PendingRequests; struct TCookieInfo { TString TopicPath; ui32 PartitionId; size_t Position; - TPendingRequest* Request; + TPendingRequest::TPtr Request; }; std::map<ui64, TCookieInfo> Cookies; diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 696d079981..a783cb00d7 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -16,22 +16,9 @@ namespace NKafka { using namespace NActors; using namespace NKikimr; - -char Hex(const unsigned char c) { - return c < 10 ? '0' + c : 'A' + c - 10; -} - -void Print(const TString& marker, TBuffer& buffer, ssize_t length) { - TStringBuilder sb; - for (ssize_t i = 0; i < length; ++i) { - char c = buffer.Data()[i]; - if (i > 0) { - sb << ", "; - } - sb << "0x" << Hex((c & 0xF0) >> 4) << Hex(c & 0x0F); - } - KAFKA_LOG_ERROR("Packet " << marker << ": " << sb); -} +static constexpr size_t HeaderSize = sizeof(TKafkaInt16) /* apiKey */ + + sizeof(TKafkaVersion) /* version */ + + sizeof(TKafkaInt32) /* correlationId */; class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNetworkConfig { public: @@ -44,6 +31,10 @@ public: TKafkaInt32 ExpectedSize = 0; TBuffer Buffer; + TKafkaInt16 ApiKey; + TKafkaVersion ApiVersion; + TKafkaInt32 CorrelationId; + TRequestHeaderData Header; std::unique_ptr<TApiMessage> Message; @@ -72,7 +63,7 @@ public: std::unordered_map<ui64, Msg::TPtr> PendingRequests; std::deque<Msg::TPtr> PendingRequestsQueue; - enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, MESSAGE_READ, MESSAGE_PROCESS }; + enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, HEADER_READ, HEADER_PROCESS, MESSAGE_READ, MESSAGE_PROCESS }; EReadSteps Step; TReadDemand Demand; @@ -230,11 +221,6 @@ protected: Msg::TPtr r = Request; - if (!Context->Authenticated() && RequireAuthentication(static_cast<EApiKey>(Request->Header.RequestApiKey))) { - KAFKA_LOG_ERROR("unauthenticated request: ApiKey=" << Request->Header.RequestApiKey); - return PassAway(); - } - PendingRequestsQueue.push_back(r); PendingRequests[r->Header.CorrelationId] = r; @@ -405,10 +391,20 @@ protected: case SIZE_PREPARE: NormalizeNumber(Request->ExpectedSize); + if (Request->ExpectedSize < 0) { + KAFKA_LOG_ERROR("Wrong message size. Size: " << Request->ExpectedSize); + return PassAway(); + } if ((ui64)Request->ExpectedSize > Context->Config.GetMaxMessageSize()) { - KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize << ". MaxSize: " << Context->Config.GetMaxMessageSize()); + KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize << ". MaxSize: " + << Context->Config.GetMaxMessageSize()); return PassAway(); } + if (static_cast<size_t>(Request->ExpectedSize) < HeaderSize) { + KAFKA_LOG_ERROR("message is small. Size: " << Request->ExpectedSize); + return PassAway(); + } + Step = INFLIGTH_CHECK; case INFLIGTH_CHECK: @@ -423,43 +419,58 @@ protected: InflightSize += Request->ExpectedSize; Step = MESSAGE_READ; + case HEADER_READ: + KAFKA_LOG_T("start read header. ExpectedSize=" << Request->ExpectedSize); + + Request->Buffer.Resize(HeaderSize); + Demand = TReadDemand(Request->Buffer.Data(), HeaderSize); + + Step = HEADER_PROCESS; + break; + + case HEADER_PROCESS: + Request->ApiKey = *(TKafkaInt16*)Request->Buffer.Data(); + Request->ApiVersion = *(TKafkaVersion*)(Request->Buffer.Data() + sizeof(TKafkaInt16)); + Request->CorrelationId = *(TKafkaInt32*)(Request->Buffer.Data() + sizeof(TKafkaInt16) + sizeof(TKafkaVersion)); + + NormalizeNumber(Request->ApiKey); + NormalizeNumber(Request->ApiVersion); + NormalizeNumber(Request->CorrelationId); + + if (PendingRequests.contains(Request->CorrelationId)) { + KAFKA_LOG_ERROR("CorrelationId " << Request->CorrelationId << " already processing"); + return PassAway(); + } + if (!Context->Authenticated() && RequireAuthentication(static_cast<EApiKey>(Request->ApiKey))) { + KAFKA_LOG_ERROR("unauthenticated request: ApiKey=" << Request->ApiKey); + return PassAway(); + } + + Step = MESSAGE_READ; + case MESSAGE_READ: KAFKA_LOG_T("start read new message. ExpectedSize=" << Request->ExpectedSize); Request->Buffer.Resize(Request->ExpectedSize); - Demand = TReadDemand(Request->Buffer.Data(), Request->ExpectedSize); + Demand = TReadDemand(Request->Buffer.Data() + HeaderSize, Request->ExpectedSize - HeaderSize); Step = MESSAGE_PROCESS; break; case MESSAGE_PROCESS: - TKafkaInt16 apiKey = *(TKafkaInt16*)Request->Buffer.Data(); - TKafkaVersion apiVersion = *(TKafkaVersion*)(Request->Buffer.Data() + sizeof(TKafkaInt16)); - TKafkaInt32 correlationId = *(TKafkaInt32*)(Request->Buffer.Data() + sizeof(TKafkaInt16) + sizeof(TKafkaInt16)); - - NormalizeNumber(apiKey); - NormalizeNumber(apiVersion); - NormalizeNumber(correlationId); - - KAFKA_LOG_D("received message. ApiKey=" << apiKey << ", Version=" << apiVersion << ", CorrelationId=" << correlationId); - - if (PendingRequests.contains(correlationId)) { - KAFKA_LOG_ERROR("CorrelationId " << correlationId << " already processing"); - return PassAway(); - } - - // Print("received", Request->Buffer, Request->ExpectedSize); + KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId); TKafkaReadable readable(Request->Buffer); - Request->Message = CreateRequest(apiKey); try { - Request->Header.Read(readable, RequestHeaderVersion(apiKey, apiVersion)); - Request->Message->Read(readable, apiVersion); + Request->Message = CreateRequest(Request->ApiKey); + + Request->Header.Read(readable, RequestHeaderVersion(Request->ApiKey, Request->ApiVersion)); + Request->Message->Read(readable, Request->ApiVersion); } catch(const yexception& e) { - KAFKA_LOG_ERROR("error on processing message: ApiKey=" << apiKey - << ", Version=" << apiVersion - << ", CorrelationId=" << correlationId + KAFKA_LOG_ERROR("error on processing message: ApiKey=" << Request->ApiKey + << ", Version=" << Request->ApiVersion + << ", CorrelationId=" << Request->CorrelationId << ", Error=" << e.what()); return PassAway(); } diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index ac633aa3cc..280bc1cb28 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -20,7 +20,6 @@ #include <random> - using namespace NKafka; using namespace NYdb; using namespace NYdb::NTable; @@ -32,7 +31,7 @@ static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin"; static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud"; static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder"; -struct WithSslAndAuth : TKikimrTestSettings { +struct WithSslAndAuth: TKikimrTestSettings { static constexpr bool SSL = true; static constexpr bool AUTH = true; }; @@ -54,8 +53,7 @@ void Print(const TBuffer& buffer) { Cerr << ">>>>> Packet sent: " << sb << Endl; } - -template<class TKikimr, bool secure> +template <class TKikimr, bool secure> class TTestServer { public: TIpPort Port; @@ -83,7 +81,6 @@ public: appConfig.MutableKafkaProxyConfig()->SetMaxMessageSize(1024); appConfig.MutableKafkaProxyConfig()->SetMaxInflightSize(2048); - appConfig.MutablePQConfig()->MutableQuotingConfig()->SetEnableQuoting(true); appConfig.MutablePQConfig()->MutableQuotingConfig()->SetQuotaWaitDurationMs(300); appConfig.MutablePQConfig()->MutableQuotingConfig()->SetPartitionReadQuotaIsTwiceWriteQuota(true); @@ -116,15 +113,7 @@ public: if (secure) { appConfig.MutablePQConfig()->SetRequireCredentialsInNewProtocol(true); } - KikimrServer = std::unique_ptr<TKikimr>(new TKikimr( - std::move(appConfig), - {}, - {}, - false, - nullptr, - nullptr, - 0) - ); + KikimrServer = std::unique_ptr<TKikimr>(new TKikimr(std::move(appConfig), {}, {}, false, nullptr, nullptr, 0)); KikimrServer->GetRuntime()->SetLogPriority(NKikimrServices::KAFKA_PROXY, NActors::NLog::PRI_TRACE); ui16 grpc = KikimrServer->GetPort(); @@ -142,21 +131,38 @@ public: NYdb::NScheme::TSchemeClient schemeClient(*Driver); NYdb::NScheme::TPermissions permissions("user@builtin", {"ydb.generic.read", "ydb.generic.write"}); - auto result = schemeClient.ModifyPermissions("/Root", - NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions) - ).ExtractValueSync(); + auto result = schemeClient + .ModifyPermissions( + "/Root", NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions)) + .ExtractValueSync(); Cerr << result.GetIssues().ToString() << "\n"; UNIT_ASSERT(result.IsSuccess()); } TClient client(*(KikimrServer->ServerSettings)); - UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - client.AlterUserAttributes("/", "Root", {{"folder_id", DEFAULT_FOLDER_ID}, - {"cloud_id", DEFAULT_CLOUD_ID}, - {"database_id", "root"}})); + UNIT_ASSERT_VALUES_EQUAL( + NMsgBusProxy::MSTATUS_OK, + client.AlterUserAttributes("/", "Root", + {{"folder_id", DEFAULT_FOLDER_ID}, + {"cloud_id", DEFAULT_CLOUD_ID}, + {"database_id", "root"}, + {"serverless_rt_coordination_node_path", "/Coordinator/Root"}, + {"serverless_rt_base_resource_ru", "/ru_Root"}})); - auto status = client.CreateUser("/Root", "ouruser", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::MSTATUS_OK); + { + auto status = client.CreateUser("/Root", "ouruser", "ourUserPassword"); + UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::MSTATUS_OK); + + NYdb::NScheme::TSchemeClient schemeClient(*Driver); + NYdb::NScheme::TPermissions permissions("ouruser", {"ydb.generic.read", "ydb.generic.write"}); + + auto result = schemeClient + .ModifyPermissions( + "/Root", NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions)) + .ExtractValueSync(); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT(result.IsSuccess()); + } } public: @@ -217,141 +223,280 @@ std::unique_ptr<TApiMessage> Read(TSocketInput& si, TRequestHeaderData* requestH return response; } +void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& msg, const TString& field, + const TString& expectedValue) { + if (msg.GetMessageMeta()) { + for (auto& [k, v] : msg.GetMessageMeta()->Fields) { + Cerr << ">>>>> key=" << k << ", value=" << v << Endl; + if (field == k) { + UNIT_ASSERT_STRINGS_EQUAL(v, expectedValue); + return; + } + } + } + UNIT_ASSERT_C(false, "Field " << field << " not found in message meta"); +} + +TMaybe<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<NYdb::NTopic::IReadSession> reader) { + TMaybe<NTopic::TReadSessionEvent::TDataReceivedEvent> result; + while (true) { + auto event = reader->GetEvent(true); + if (!event) + return result; + if (auto dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) { + result = *dataEvent; + break; + } else if (auto* lockEv = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { + lockEv->Confirm(); + } else if (auto* releaseEv = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) { + releaseEv->Confirm(); + } else if (auto* closeSessionEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) { + return result; + } + } + return result; +} + +class TTestClient { +public: + TTestClient(ui16 port, const TString clientName = "TestClient") + : Addr("localhost", port) + , Socket(Addr) + , So(Socket) + , Si(Socket) + , Correlation(0) + , ClientName(clientName) { + } + + TApiVersionsResponseData::TPtr ApiVersions() { + Cerr << ">>>>> ApiVersionsRequest\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::API_VERSIONS, 2); + + TApiVersionsRequestData request; + request.ClientSoftwareName = "SuperTest"; + request.ClientSoftwareVersion = "3100.7.13"; + + return WriteAndRead<TApiVersionsResponseData>(header, request); + } + + TSaslHandshakeResponseData::TPtr SaslHandshake(const TString& mechanism = "PLAIN") { + Cerr << ">>>>> SaslHandshakeRequest\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::SASL_HANDSHAKE, 1); + + TSaslHandshakeRequestData request; + request.Mechanism = mechanism; + + return WriteAndRead<TSaslHandshakeResponseData>(header, request); + } + + TSaslAuthenticateResponseData::TPtr SaslAuthenticate(const TString& user, const TString& password) { + Cerr << ">>>>> SaslAuthenticateRequestData\n"; + + TStringBuilder authBytes; + authBytes << "ignored" << '\0' << user << '\0' << password; + + TRequestHeaderData header = Header(NKafka::EApiKey::SASL_AUTHENTICATE, 2); + + TSaslAuthenticateRequestData request; + request.AuthBytes = TKafkaRawBytes(authBytes.data(), authBytes.size()); + + return WriteAndRead<TSaslAuthenticateResponseData>(header, request); + } + + TInitProducerIdResponseData::TPtr InitProducerId() { + Cerr << ">>>>> TInitProducerIdRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::INIT_PRODUCER_ID, 4); + + TInitProducerIdRequestData request; + request.TransactionTimeoutMs = 5000; + + return WriteAndRead<TInitProducerIdResponseData>(header, request); + } + + TProduceResponseData::TPtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) { + Cerr << ">>>>> TProduceRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9); + + TProduceRequestData request; + request.TopicData.resize(1); + request.TopicData[0].Name = topicName; + request.TopicData[0].PartitionData.resize(1); + request.TopicData[0].PartitionData[0].Index = partition; + request.TopicData[0].PartitionData[0].Records = batch; + + return WriteAndRead<TProduceResponseData>(header, request); + } + + void UnknownApiKey() { + Cerr << ">>>>> Unknown apiKey\n"; + + TRequestHeaderData header; + header.RequestApiKey = 7654; + header.RequestApiVersion = 1; + header.CorrelationId = NextCorrelation(); + header.ClientId = ClientName; + + TApiVersionsRequestData request; + request.ClientSoftwareName = "SuperTest"; + request.ClientSoftwareVersion = "3100.7.13"; + + Write(So, &header, &request); + } + +protected: + ui32 NextCorrelation() { + return Correlation++; + } + + template <class T> + typename T::TPtr WriteAndRead(TRequestHeaderData& header, TApiMessage& request) { + Write(So, &header, &request); + + auto response = Read(Si, &header); + auto* msg = dynamic_cast<T*>(response.release()); + return std::shared_ptr<T>(msg); + } + + TRequestHeaderData Header(NKafka::EApiKey apiKey, TKafkaVersion version) { + TRequestHeaderData header; + header.RequestApiKey = apiKey; + header.RequestApiVersion = version; + header.CorrelationId = NextCorrelation(); + header.ClientId = ClientName; + return header; + } + +private: + TNetworkAddress Addr; + TSocket Socket; + TSocketOutput So; + TSocketInput Si; + + ui32 Correlation; + TString ClientName; +}; + Y_UNIT_TEST_SUITE(KafkaProtocol) { Y_UNIT_TEST(ProduceScenario) { TInsecureTestServer testServer; TString topicName = "/Root/topic-0-test"; + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); { - NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - auto result = pqClient.CreateTopic(topicName).ExtractValueSync(); + auto result = + pqClient + .CreateTopic(topicName, + NYdb::NTopic::TCreateTopicSettings().BeginAddConsumer("consumer-0").EndAddConsumer()) + .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); } - TNetworkAddress addr("localhost", testServer.Port); - TSocket s(addr); - TSocketOutput so(s); - TSocketInput si(s); + auto settings = NTopic::TReadSessionSettings() + .AppendTopics(NTopic::TTopicReadSettings(topicName)) + .ConsumerName("consumer-0"); + auto topicReader = pqClient.CreateReadSession(settings); - size_t correlationId = 0; + TTestClient client(testServer.Port); { - Cerr << ">>>>> ApiVersionsRequest\n"; - - TRequestHeaderData header; - header.RequestApiKey = NKafka::EApiKey::API_VERSIONS; - header.RequestApiVersion = 2; - header.CorrelationId = correlationId++; - header.ClientId = "test"; - - TApiVersionsRequestData request; - request.ClientSoftwareName = "SuperTest"; - request.ClientSoftwareVersion = "3100.7.13"; - - Write(so, &header, &request); - - auto response = Read(si, &header); - auto* msg = dynamic_cast<TApiVersionsResponseData*>(response.get()); + auto msg = client.ApiVersions(); UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 6u); } { - Cerr << ">>>>> SaslHandshakeRequest\n"; + auto msg = client.SaslHandshake(); - TRequestHeaderData header; - header.RequestApiKey = NKafka::EApiKey::SASL_HANDSHAKE; - header.RequestApiVersion = 1; - header.CorrelationId = correlationId++; - header.ClientId = "test"; + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } - TSaslHandshakeRequestData request; - request.Mechanism = "PLAIN"; + { + auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - Write(so, &header, &request); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } - auto response = Read(si, &header); - auto* msg = dynamic_cast<TSaslHandshakeResponseData*>(response.get()); + { + auto msg = client.InitProducerId(); UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); } { - Cerr << ">>>>> SaslAuthenticateRequestData\n"; - char authBytes[] = "ignored\0ouruser@/Root\0ourUserPassword"; + TString key = "record-key"; + TString value = "record-value"; + TString headerKey = "header-key"; + TString headerValue = "header-value"; + + TKafkaRecordBatch batch; + batch.BaseOffset = 3; + batch.BaseSequence = 5; + batch.Magic = 2; // Current supported + batch.Records.resize(1); + batch.Records[0].Key = TKafkaRawBytes(key.Data(), key.Size()); + batch.Records[0].Value = TKafkaRawBytes(value.Data(), value.Size()); + batch.Records[0].Headers.resize(1); + batch.Records[0].Headers[0].Key = TKafkaRawBytes(headerKey.Data(), headerKey.Size()); + batch.Records[0].Headers[0].Value = TKafkaRawBytes(headerValue.Data(), headerValue.Size()); + + auto msg = client.Produce(topicName, 0, batch); - TRequestHeaderData header; - header.RequestApiKey = NKafka::EApiKey::SASL_AUTHENTICATE; - header.RequestApiVersion = 2; - header.CorrelationId = correlationId++; - header.ClientId = "test"; - - TSaslAuthenticateRequestData request; - request.AuthBytes = TKafkaRawBytes(authBytes, sizeof(authBytes) - 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, + static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - Write(so, &header, &request); + auto m = Read(topicReader); + UNIT_ASSERT(m); - auto response = Read(si, &header); - auto* msg = dynamic_cast<TSaslAuthenticateResponseData*>(response.get()); + UNIT_ASSERT_EQUAL(m->GetMessages().size(), 1); + auto& m0 = m->GetMessages()[0]; + m0.Commit(); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_STRINGS_EQUAL(m0.GetData(), value); + AssertMessageMeta(m0, "__key", key); + AssertMessageMeta(m0, headerKey, headerValue); } { - Cerr << ">>>>> TInitProducerIdRequestData\n"; + // Check short topic name - TRequestHeaderData header; - header.RequestApiKey = NKafka::EApiKey::INIT_PRODUCER_ID; - header.RequestApiVersion = 4; - header.CorrelationId = correlationId++; - header.ClientId = "test"; + TKafkaRecordBatch batch; + batch.BaseOffset = 7; + batch.BaseSequence = 11; + batch.Magic = 2; // Current supported + batch.Records.resize(1); + batch.Records[0].Key = "record-key-1"; + batch.Records[0].Value = "record-value-1"; - TInitProducerIdRequestData request; - request.TransactionTimeoutMs = 5000; + auto msg = client.Produce("topic-0-test", 0, batch); - Write(so, &header, &request); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, "topic-0-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, + static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - auto response = Read(si, &header); - auto* msg = dynamic_cast<TInitProducerIdResponseData*>(response.get()); + auto m = Read(topicReader); + UNIT_ASSERT(m); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_EQUAL(m->GetMessages().size(), 1); + auto& m0 = m->GetMessages()[0]; + m0.Commit(); } { - Cerr << ">>>>> TProduceRequestData\n"; - - TRequestHeaderData header; - header.RequestApiKey = NKafka::EApiKey::PRODUCE; - header.RequestApiVersion = 9; - header.CorrelationId = correlationId++; - header.ClientId = "test-client-random-string"; - - TProduceRequestData request; - request.TopicData.resize(1); - request.TopicData[0].Name = topicName; - request.TopicData[0].PartitionData.resize(1); - request.TopicData[0].PartitionData[0].Index = 0; // Partition id - request.TopicData[0].PartitionData[0].Records.emplace(); - request.TopicData[0].PartitionData[0].Records->BaseOffset = 3; - request.TopicData[0].PartitionData[0].Records->BaseSequence = 5; - request.TopicData[0].PartitionData[0].Records->Magic = 2; // Current supported - request.TopicData[0].PartitionData[0].Records->Records.resize(1); - request.TopicData[0].PartitionData[0].Records->Records[0].Key = "record-key"; - request.TopicData[0].PartitionData[0].Records->Records[0].Value = "record-value"; - - Write(so, &header, &request); - - auto response = Read(si, &header); - auto* msg = dynamic_cast<TProduceResponseData*>(response.get()); - - UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName); - UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0); - // UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + // Check unknown ApiKey (must be last. close the session) + // expect no exception + client.UnknownApiKey(); } - } -}
\ No newline at end of file + } // Y_UNIT_TEST(ProduceScenario) +} // Y_UNIT_TEST_SUITE(KafkaProtocol) |