aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-09-21 18:54:16 +0300
committertesseract <tesseract@yandex-team.com>2023-09-21 19:40:11 +0300
commitd3c55bfee233908911d7f4eaf3c4f9cfd22a6164 (patch)
tree2614aa0433cac9f3d445240406b92047942faf6e
parentacade28baad76e9046eb4a2d50206b0f029b7a2c (diff)
downloadydb-d3c55bfee233908911d7f4eaf3c4f9cfd22a6164.tar.gz
Fix asan error in test
-rw-r--r--ydb/core/kafka_proxy/actors/actors.h7
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp41
2 files changed, 25 insertions, 23 deletions
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h
index 22b4c0162f7..8baf5ba5b67 100644
--- a/ydb/core/kafka_proxy/actors/actors.h
+++ b/ydb/core/kafka_proxy/actors/actors.h
@@ -44,7 +44,7 @@ struct TContext {
bool Authenticated() { return AuthenticationStep == SUCCESS; }
};
-template<class T>
+template<std::derived_from<TApiMessage> T>
class TMessagePtr {
public:
TMessagePtr(const std::shared_ptr<TBuffer>& buffer, const std::shared_ptr<TApiMessage>& message)
@@ -53,6 +53,11 @@ public:
, Ptr(dynamic_cast<T*>(message.get())) {
}
+ template<std::derived_from<TApiMessage> O>
+ TMessagePtr<O> Cast() {
+ return TMessagePtr<O>(Buffer, Message);
+ }
+
T* operator->() const {
return Ptr;
}
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index 64af30b99db..95c5d783124 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -1,6 +1,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include "../kafka_messages.h"
+#include "../actors/actors.h"
#include <ydb/services/ydb/ydb_common_ut.h>
#include <ydb/services/ydb/ydb_keys_ut.h>
@@ -223,19 +224,20 @@ void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request)
so.Flush();
}
-std::unique_ptr<TApiMessage> Read(TSocketInput& si, TRequestHeaderData* requestHeader) {
+template<std::derived_from<TApiMessage> T>
+TMessagePtr<T> Read(TSocketInput& si, TRequestHeaderData* requestHeader) {
TKafkaInt32 size;
si.Read(&size, sizeof(size));
NKafka::NormalizeNumber(size);
- TBuffer buffer;
- buffer.Resize(size);
- si.Load(buffer.Data(), size);
+ auto buffer= std::make_shared<TBuffer>();
+ buffer->Resize(size);
+ si.Load(buffer->Data(), size);
TKafkaVersion headerVersion = ResponseHeaderVersion(requestHeader->RequestApiKey, requestHeader->RequestApiVersion);
- TKafkaReadable readable(buffer);
+ TKafkaReadable readable(*buffer);
TResponseHeaderData header;
header.Read(readable, headerVersion);
@@ -245,7 +247,7 @@ std::unique_ptr<TApiMessage> Read(TSocketInput& si, TRequestHeaderData* requestH
auto response = CreateResponse(requestHeader->RequestApiKey);
response->Read(readable, requestHeader->RequestApiVersion);
- return response;
+ return TMessagePtr<T>(buffer, std::shared_ptr<TApiMessage>(response.release()));
}
void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& msg, const TString& field,
@@ -294,7 +296,7 @@ public:
, ClientName(clientName) {
}
- TApiVersionsResponseData::TPtr ApiVersions() {
+ TMessagePtr<TApiVersionsResponseData> ApiVersions() {
Cerr << ">>>>> ApiVersionsRequest\n";
TRequestHeaderData header = Header(NKafka::EApiKey::API_VERSIONS, 2);
@@ -306,7 +308,7 @@ public:
return WriteAndRead<TApiVersionsResponseData>(header, request);
}
- TSaslHandshakeResponseData::TPtr SaslHandshake(const TString& mechanism = "PLAIN") {
+ TMessagePtr<TSaslHandshakeResponseData> SaslHandshake(const TString& mechanism = "PLAIN") {
Cerr << ">>>>> SaslHandshakeRequest\n";
TRequestHeaderData header = Header(NKafka::EApiKey::SASL_HANDSHAKE, 1);
@@ -317,7 +319,7 @@ public:
return WriteAndRead<TSaslHandshakeResponseData>(header, request);
}
- TSaslAuthenticateResponseData::TPtr SaslAuthenticate(const TString& user, const TString& password) {
+ TMessagePtr<TSaslAuthenticateResponseData> SaslAuthenticate(const TString& user, const TString& password) {
Cerr << ">>>>> SaslAuthenticateRequestData\n";
TStringBuilder authBytes;
@@ -331,7 +333,7 @@ public:
return WriteAndRead<TSaslAuthenticateResponseData>(header, request);
}
- TInitProducerIdResponseData::TPtr InitProducerId() {
+ TMessagePtr<TInitProducerIdResponseData> InitProducerId() {
Cerr << ">>>>> TInitProducerIdRequestData\n";
TRequestHeaderData header = Header(NKafka::EApiKey::INIT_PRODUCER_ID, 4);
@@ -342,13 +344,13 @@ public:
return WriteAndRead<TInitProducerIdResponseData>(header, request);
}
- TProduceResponseData::TPtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) {
+ TMessagePtr<TProduceResponseData> Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) {
std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs;
msgs.emplace_back(partition, batch);
return Produce(topicName, msgs);
}
- TProduceResponseData::TPtr Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) {
+ TMessagePtr<TProduceResponseData> Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) {
Cerr << ">>>>> TProduceRequestData\n";
TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9);
@@ -365,7 +367,7 @@ public:
return WriteAndRead<TProduceResponseData>(header, request);
}
- TListOffsetsResponseData::TPtr ListOffsets(ui64 partitionsCount, const TString& topic) {
+ TMessagePtr<TListOffsetsResponseData> ListOffsets(ui64 partitionsCount, const TString& topic) {
Cerr << ">>>>> TListOffsetsResponseData\n";
TRequestHeaderData header = Header(NKafka::EApiKey::LIST_OFFSETS, 4);
@@ -385,7 +387,7 @@ public:
return WriteAndRead<TListOffsetsResponseData>(header, request);
}
- TFetchResponseData::TPtr Fetch(const TString& topic) {
+ TMessagePtr<TFetchResponseData> Fetch(const TString& topic) {
Cerr << ">>>>> TFetchResponseData\n";
TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 3);
@@ -406,8 +408,6 @@ public:
request.Topics.push_back(topicReq);
-
-
return WriteAndRead<TFetchResponseData>(header, request);
}
@@ -432,13 +432,10 @@ protected:
return Correlation++;
}
- template <class T>
- typename T::TPtr WriteAndRead(TRequestHeaderData& header, TApiMessage& request) {
+ template <std::derived_from<TApiMessage> T>
+ TMessagePtr<T> WriteAndRead(TRequestHeaderData& header, TApiMessage& request) {
Write(So, &header, &request);
-
- auto response = Read(Si, &header);
- auto* msg = dynamic_cast<T*>(response.release());
- return std::shared_ptr<T>(msg);
+ return Read<T>(Si, &header);
}
TRequestHeaderData Header(NKafka::EApiKey apiKey, TKafkaVersion version) {