aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-16 08:55:48 +0300
committertesseract <tesseract@yandex-team.com>2023-08-16 09:25:53 +0300
commit6af3d05fb851c95b1cac3a1df107ed32f82f1c5c (patch)
tree378ccf8c3893f2f587fc8ac1b0eec19fc9ea64b6
parent1eb4cf350a6592e674479a84eb38adbccda18760 (diff)
downloadydb-6af3d05fb851c95b1cac3a1df107ed32f82f1c5c.tar.gz
fix nullopt error if key is not set
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp24
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp6
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp23
3 files changed, 35 insertions, 18 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index 7517277ae43..c00421e7787 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -255,24 +255,26 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
ui64 totalSize = 0;
for (const auto& record : batch->Records) {
- if (!record.Value) {
- continue;
- }
-
NKikimrPQClient::TDataChunk proto;
for(auto& h : record.Headers) {
- auto res = proto.AddMessageMeta();
- res->set_key(static_cast<const char*>(h.Key->data()), h.Key->size());
- res->set_value(static_cast<const char*>(h.Value->data()), h.Value->size());
+ auto res = proto.AddMessageMeta();
+ if (h.Key) {
+ res->set_key(static_cast<const char*>(h.Key->data()), h.Key->size());
+ }
+ if (h.Value) {
+ res->set_value(static_cast<const char*>(h.Value->data()), h.Value->size());
+ }
}
- {
+ if (record.Key) {
auto res = proto.AddMessageMeta();
res->set_key("__key");
res->set_value(static_cast<const char*>(record.Key->data()), record.Key->size());
}
- proto.SetData(static_cast<const void*>(record.Value->data()), record.Value->size());
+ if (record.Value) {
+ proto.SetData(static_cast<const void*>(record.Value->data()), record.Value->size());
+ }
TString str;
bool res = proto.SerializeToString(&str);
@@ -285,11 +287,11 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
w->SetData(str);
w->SetCreateTimeMS(batch->BaseTimestamp + record.TimestampDelta);
w->SetDisableDeduplication(true);
- w->SetUncompressedSize(record.Value->size());
+ w->SetUncompressedSize(record.Value ? record.Value->size() : 0);
w->SetClientDC(clientDC);
w->SetIgnoreQuotaDeadline(true);
- totalSize += record.Value->size();
+ totalSize += record.Value ? record.Value->size() : 0;
}
partitionRequest->SetPutUnitsSize(NPQ::PutUnitsSize(totalSize));
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
index a3cbc7fb685..a2cb796c6aa 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
@@ -8,6 +8,8 @@
namespace NKafka {
+static constexpr char ERROR_AUTH_BYTES[] = "";
+
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) {
return new TKafkaSaslAuthActor(context, correlationId, address, message);
}
@@ -43,7 +45,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes
auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>();
responseToClient->ErrorCode = EKafkaErrors::NONE_ERROR;
responseToClient->ErrorMessage = "";
- responseToClient->AuthBytes = TKafkaRawBytes();
+ responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES));
auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
@@ -98,7 +100,7 @@ void TKafkaSaslAuthActor::SendAuthFailedAndDie(TString errorMessage, EKafkaError
auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>();
responseToClient->ErrorCode = errorCode;
responseToClient->ErrorMessage = errorMessage;
- responseToClient->AuthBytes = TKafkaRawBytes();
+ responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES));
auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, nullptr, "", errorMessage);
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 92ddbb5e209..bc951d1b95f 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -64,6 +64,7 @@ public:
bool ConnectionEstablished = false;
bool CloseConnection = false;
+ bool ActorActive = true;
NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier;
@@ -106,6 +107,10 @@ public:
void PassAway() override {
KAFKA_LOG_D("PassAway");
+ if (!ActorActive) {
+ return;
+ }
+ ActorActive = false;
if (ConnectionEstablished) {
ConnectionEstablished = false;
@@ -348,13 +353,21 @@ protected:
TBufferedWriter buffer(Socket.Get(), Context->Config.GetPacketSize());
TKafkaWritable writable(buffer);
- writable << size;
- responseHeader.Write(writable, headerVersion);
- reply->Write(writable, version);
+ try {
+ writable << size;
+ responseHeader.Write(writable, headerVersion);
+ reply->Write(writable, version);
- buffer.flush();
+ buffer.flush();
- KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size);
+ KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size);
+ } catch(const yexception& e) {
+ KAFKA_LOG_ERROR("error on processing response: ApiKey=" << reply->ApiKey()
+ << ", Version=" << version
+ << ", CorrelationId=" << header->CorrelationId
+ << ", Error=" << e.what());
+ return PassAway();
+ }
}
void DoRead(const TActorContext& ctx) {