diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-16 08:55:48 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-16 09:25:53 +0300 |
commit | 6af3d05fb851c95b1cac3a1df107ed32f82f1c5c (patch) | |
tree | 378ccf8c3893f2f587fc8ac1b0eec19fc9ea64b6 | |
parent | 1eb4cf350a6592e674479a84eb38adbccda18760 (diff) | |
download | ydb-6af3d05fb851c95b1cac3a1df107ed32f82f1c5c.tar.gz |
fix nullopt error if key is not set
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp | 24 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 23 |
3 files changed, 35 insertions, 18 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 7517277ae43..c00421e7787 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -255,24 +255,26 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData:: ui64 totalSize = 0; for (const auto& record : batch->Records) { - if (!record.Value) { - continue; - } - NKikimrPQClient::TDataChunk proto; for(auto& h : record.Headers) { - auto res = proto.AddMessageMeta(); - res->set_key(static_cast<const char*>(h.Key->data()), h.Key->size()); - res->set_value(static_cast<const char*>(h.Value->data()), h.Value->size()); + auto res = proto.AddMessageMeta(); + if (h.Key) { + res->set_key(static_cast<const char*>(h.Key->data()), h.Key->size()); + } + if (h.Value) { + res->set_value(static_cast<const char*>(h.Value->data()), h.Value->size()); + } } - { + if (record.Key) { auto res = proto.AddMessageMeta(); res->set_key("__key"); res->set_value(static_cast<const char*>(record.Key->data()), record.Key->size()); } - proto.SetData(static_cast<const void*>(record.Value->data()), record.Value->size()); + if (record.Value) { + proto.SetData(static_cast<const void*>(record.Value->data()), record.Value->size()); + } TString str; bool res = proto.SerializeToString(&str); @@ -285,11 +287,11 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData:: w->SetData(str); w->SetCreateTimeMS(batch->BaseTimestamp + record.TimestampDelta); w->SetDisableDeduplication(true); - w->SetUncompressedSize(record.Value->size()); + w->SetUncompressedSize(record.Value ? record.Value->size() : 0); w->SetClientDC(clientDC); w->SetIgnoreQuotaDeadline(true); - totalSize += record.Value->size(); + totalSize += record.Value ? record.Value->size() : 0; } partitionRequest->SetPutUnitsSize(NPQ::PutUnitsSize(totalSize)); diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp index a3cbc7fb685..a2cb796c6aa 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -8,6 +8,8 @@ namespace NKafka { +static constexpr char ERROR_AUTH_BYTES[] = ""; + NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) { return new TKafkaSaslAuthActor(context, correlationId, address, message); } @@ -43,7 +45,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>(); responseToClient->ErrorCode = EKafkaErrors::NONE_ERROR; responseToClient->ErrorMessage = ""; - responseToClient->AuthBytes = TKafkaRawBytes(); + responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES)); auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); @@ -98,7 +100,7 @@ void TKafkaSaslAuthActor::SendAuthFailedAndDie(TString errorMessage, EKafkaError auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>(); responseToClient->ErrorCode = errorCode; responseToClient->ErrorMessage = errorMessage; - responseToClient->AuthBytes = TKafkaRawBytes(); + responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES)); auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, nullptr, "", errorMessage); diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 92ddbb5e209..bc951d1b95f 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -64,6 +64,7 @@ public: bool ConnectionEstablished = false; bool CloseConnection = false; + bool ActorActive = true; NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; @@ -106,6 +107,10 @@ public: void PassAway() override { KAFKA_LOG_D("PassAway"); + if (!ActorActive) { + return; + } + ActorActive = false; if (ConnectionEstablished) { ConnectionEstablished = false; @@ -348,13 +353,21 @@ protected: TBufferedWriter buffer(Socket.Get(), Context->Config.GetPacketSize()); TKafkaWritable writable(buffer); - writable << size; - responseHeader.Write(writable, headerVersion); - reply->Write(writable, version); + try { + writable << size; + responseHeader.Write(writable, headerVersion); + reply->Write(writable, version); - buffer.flush(); + buffer.flush(); - KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size); + KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size); + } catch(const yexception& e) { + KAFKA_LOG_ERROR("error on processing response: ApiKey=" << reply->ApiKey() + << ", Version=" << version + << ", CorrelationId=" << header->CorrelationId + << ", Error=" << e.what()); + return PassAway(); + } } void DoRead(const TActorContext& ctx) { |