diff options
author | tesseract <tesseract@yandex-team.com> | 2023-09-21 18:54:16 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-09-21 19:40:11 +0300 |
commit | d3c55bfee233908911d7f4eaf3c4f9cfd22a6164 (patch) | |
tree | 2614aa0433cac9f3d445240406b92047942faf6e | |
parent | acade28baad76e9046eb4a2d50206b0f029b7a2c (diff) | |
download | ydb-d3c55bfee233908911d7f4eaf3c4f9cfd22a6164.tar.gz |
Fix asan error in test
-rw-r--r-- | ydb/core/kafka_proxy/actors/actors.h | 7 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 41 |
2 files changed, 25 insertions, 23 deletions
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 22b4c0162f7..8baf5ba5b67 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -44,7 +44,7 @@ struct TContext { bool Authenticated() { return AuthenticationStep == SUCCESS; } }; -template<class T> +template<std::derived_from<TApiMessage> T> class TMessagePtr { public: TMessagePtr(const std::shared_ptr<TBuffer>& buffer, const std::shared_ptr<TApiMessage>& message) @@ -53,6 +53,11 @@ public: , Ptr(dynamic_cast<T*>(message.get())) { } + template<std::derived_from<TApiMessage> O> + TMessagePtr<O> Cast() { + return TMessagePtr<O>(Buffer, Message); + } + T* operator->() const { return Ptr; } diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 64af30b99db..95c5d783124 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1,6 +1,7 @@ #include <library/cpp/testing/unittest/registar.h> #include "../kafka_messages.h" +#include "../actors/actors.h" #include <ydb/services/ydb/ydb_common_ut.h> #include <ydb/services/ydb/ydb_keys_ut.h> @@ -223,19 +224,20 @@ void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request) so.Flush(); } -std::unique_ptr<TApiMessage> Read(TSocketInput& si, TRequestHeaderData* requestHeader) { +template<std::derived_from<TApiMessage> T> +TMessagePtr<T> Read(TSocketInput& si, TRequestHeaderData* requestHeader) { TKafkaInt32 size; si.Read(&size, sizeof(size)); NKafka::NormalizeNumber(size); - TBuffer buffer; - buffer.Resize(size); - si.Load(buffer.Data(), size); + auto buffer= std::make_shared<TBuffer>(); + buffer->Resize(size); + si.Load(buffer->Data(), size); TKafkaVersion headerVersion = ResponseHeaderVersion(requestHeader->RequestApiKey, requestHeader->RequestApiVersion); - TKafkaReadable readable(buffer); + TKafkaReadable readable(*buffer); TResponseHeaderData header; header.Read(readable, headerVersion); @@ -245,7 +247,7 @@ std::unique_ptr<TApiMessage> Read(TSocketInput& si, TRequestHeaderData* requestH auto response = CreateResponse(requestHeader->RequestApiKey); response->Read(readable, requestHeader->RequestApiVersion); - return response; + return TMessagePtr<T>(buffer, std::shared_ptr<TApiMessage>(response.release())); } void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& msg, const TString& field, @@ -294,7 +296,7 @@ public: , ClientName(clientName) { } - TApiVersionsResponseData::TPtr ApiVersions() { + TMessagePtr<TApiVersionsResponseData> ApiVersions() { Cerr << ">>>>> ApiVersionsRequest\n"; TRequestHeaderData header = Header(NKafka::EApiKey::API_VERSIONS, 2); @@ -306,7 +308,7 @@ public: return WriteAndRead<TApiVersionsResponseData>(header, request); } - TSaslHandshakeResponseData::TPtr SaslHandshake(const TString& mechanism = "PLAIN") { + TMessagePtr<TSaslHandshakeResponseData> SaslHandshake(const TString& mechanism = "PLAIN") { Cerr << ">>>>> SaslHandshakeRequest\n"; TRequestHeaderData header = Header(NKafka::EApiKey::SASL_HANDSHAKE, 1); @@ -317,7 +319,7 @@ public: return WriteAndRead<TSaslHandshakeResponseData>(header, request); } - TSaslAuthenticateResponseData::TPtr SaslAuthenticate(const TString& user, const TString& password) { + TMessagePtr<TSaslAuthenticateResponseData> SaslAuthenticate(const TString& user, const TString& password) { Cerr << ">>>>> SaslAuthenticateRequestData\n"; TStringBuilder authBytes; @@ -331,7 +333,7 @@ public: return WriteAndRead<TSaslAuthenticateResponseData>(header, request); } - TInitProducerIdResponseData::TPtr InitProducerId() { + TMessagePtr<TInitProducerIdResponseData> InitProducerId() { Cerr << ">>>>> TInitProducerIdRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::INIT_PRODUCER_ID, 4); @@ -342,13 +344,13 @@ public: return WriteAndRead<TInitProducerIdResponseData>(header, request); } - TProduceResponseData::TPtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) { + TMessagePtr<TProduceResponseData> Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) { std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs; msgs.emplace_back(partition, batch); return Produce(topicName, msgs); } - TProduceResponseData::TPtr Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) { + TMessagePtr<TProduceResponseData> Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) { Cerr << ">>>>> TProduceRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9); @@ -365,7 +367,7 @@ public: return WriteAndRead<TProduceResponseData>(header, request); } - TListOffsetsResponseData::TPtr ListOffsets(ui64 partitionsCount, const TString& topic) { + TMessagePtr<TListOffsetsResponseData> ListOffsets(ui64 partitionsCount, const TString& topic) { Cerr << ">>>>> TListOffsetsResponseData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::LIST_OFFSETS, 4); @@ -385,7 +387,7 @@ public: return WriteAndRead<TListOffsetsResponseData>(header, request); } - TFetchResponseData::TPtr Fetch(const TString& topic) { + TMessagePtr<TFetchResponseData> Fetch(const TString& topic) { Cerr << ">>>>> TFetchResponseData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 3); @@ -406,8 +408,6 @@ public: request.Topics.push_back(topicReq); - - return WriteAndRead<TFetchResponseData>(header, request); } @@ -432,13 +432,10 @@ protected: return Correlation++; } - template <class T> - typename T::TPtr WriteAndRead(TRequestHeaderData& header, TApiMessage& request) { + template <std::derived_from<TApiMessage> T> + TMessagePtr<T> WriteAndRead(TRequestHeaderData& header, TApiMessage& request) { Write(So, &header, &request); - - auto response = Read(Si, &header); - auto* msg = dynamic_cast<T*>(response.release()); - return std::shared_ptr<T>(msg); + return Read<T>(Si, &header); } TRequestHeaderData Header(NKafka::EApiKey apiKey, TKafkaVersion version) { |