aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-18 11:53:00 +0300
committertesseract <tesseract@yandex-team.com>2023-08-18 16:15:34 +0300
commitf2318846c79045407d30ab0c1dbb44e1fcb749e0 (patch)
treed33e78ec6cebdef9ac6fc1a82fa9fd4ef9156170
parentb4f93a19b70aa5276a2ae7e17fc82c37be6c207e (diff)
downloadydb-f2318846c79045407d30ab0c1dbb44e1fcb749e0.tar.gz
More accurate error codes
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp12
-rw-r--r--ydb/core/persqueue/writer/writer.cpp44
-rw-r--r--ydb/core/persqueue/writer/writer.h13
3 files changed, 46 insertions, 23 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index c00421e778..908446b032 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -417,6 +417,16 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque
}
}
+EKafkaErrors Convert(TEvPartitionWriter::TEvWriteResponse::EErrors value) {
+ switch(value) {
+ case TEvPartitionWriter::TEvWriteResponse::EErrors::PartitionDisconnected:
+ case TEvPartitionWriter::TEvWriteResponse::EErrors::PartitionNotLocal:
+ return EKafkaErrors::NOT_LEADER_OR_FOLLOWER;
+ default:
+ return EKafkaErrors::UNKNOWN_SERVER_ERROR;
+ }
+}
+
void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
auto expireTime = ctx.Now() - REQUEST_EXPIRATION_INTERVAL;
@@ -474,7 +484,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
partitionResponse.BaseOffset = lastResult.GetSeqNo();
}
} else {
- partitionResponse.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR;
+ partitionResponse.ErrorCode = Convert(msg->GetError().Code);
partitionResponse.ErrorMessage = msg->GetError().Reason;
}
}
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index 6c52faf1aa..1f3beeb1cf 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -122,20 +122,22 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
return response;
}
- void BecomeZombie(const TString& error) {
+ void BecomeZombie(const TEvPartitionWriter::TEvWriteResponse::EErrors errorCode, const TString& error) {
+ ErrorCode = errorCode;
+
SendError(error);
Become(&TThis::StateZombie);
}
void SendError(const TString& error) {
for (auto cookie : std::exchange(PendingWrite, {})) {
- SendWriteResult(error, MakeResponse(cookie));
+ SendWriteResult(ErrorCode, error, MakeResponse(cookie));
}
for (const auto& [cookie, _] : std::exchange(PendingReserve, {})) {
- SendWriteResult(error, MakeResponse(cookie));
+ SendWriteResult(ErrorCode, error, MakeResponse(cookie));
}
for (const auto& [cookie, _] : std::exchange(Pending, {})) {
- SendWriteResult(error, MakeResponse(cookie));
+ SendWriteResult(ErrorCode, error, MakeResponse(cookie));
}
}
@@ -146,7 +148,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
void InitResult(const TString& reason, NKikimrClient::TResponse&& response) {
SendInitResult(reason, std::move(response));
- BecomeZombie("Init error");
+ BecomeZombie(TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError, "Init error");
}
void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo) {
@@ -158,9 +160,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
Send(Client, new TEvPartitionWriter::TEvWriteResponse(std::forward<Args>(args)...));
}
- void WriteResult(const TString& reason, NKikimrClient::TResponse&& response) {
- SendWriteResult(reason, std::move(response));
- BecomeZombie("Write error");
+ void WriteResult(TEvPartitionWriter::TEvWriteResponse::EErrors errorCode, const TString& reason, NKikimrClient::TResponse&& response) {
+ SendWriteResult(errorCode, reason, std::move(response));
+ BecomeZombie(errorCode, "Write error");
}
void WriteResult(NKikimrClient::TResponse&& response) {
@@ -172,9 +174,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
Send(Client, new TEvPartitionWriter::TEvWriteAccepted(cookie));
}
- void Disconnected() {
+ void Disconnected(TEvPartitionWriter::TEvWriteResponse::EErrors errorCode) {
Send(Client, new TEvPartitionWriter::TEvDisconnected());
- BecomeZombie("Disconnected");
+ BecomeZombie(errorCode, "Disconnected");
}
/// GetOwnership
@@ -337,7 +339,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
void Reject(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) {
const auto cookie = ev->Get()->Record.GetPartitionRequest().GetCookie();
- return WriteResult("Rejected by writer", MakeResponse(cookie));
+ return WriteResult(ErrorCode, "Rejected by writer", MakeResponse(cookie));
}
void HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) {
@@ -404,13 +406,13 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
TString error;
if (!BasicCheck(record, error)) {
- return WriteResult(error, std::move(record));
+ return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, error, std::move(record));
}
const auto& response = record.GetPartitionResponse();
if (!response.CmdWriteResultSize()) {
if (PendingReserve.empty()) {
- return WriteResult("Unexpected ReserveBytes response", std::move(record));
+ return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, "Unexpected ReserveBytes response", std::move(record));
}
const auto cookie = PendingReserve.begin()->first;
@@ -418,14 +420,14 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
error = TStringBuilder() << "Unexpected cookie at ReserveBytes"
<< ": expected# " << cookie
<< ", got# " << response.GetCookie();
- return WriteResult(error, std::move(record));
+ return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, error, std::move(record));
}
WriteAccepted(cookie);
Write(cookie);
} else {
if (PendingWrite.empty()) {
- return WriteResult("Unexpected Write response", std::move(record));
+ return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, "Unexpected Write response", std::move(record));
}
const auto cookie = PendingWrite.front();
@@ -433,7 +435,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
error = TStringBuilder() << "Unexpected cookie at Write"
<< ": expected# " << cookie
<< ", got# " << response.GetCookie();
- return WriteResult(error, std::move(record));
+ return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, error, std::move(record));
}
WriteResult(std::move(record));
@@ -447,7 +449,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
if (msg->Status != NKikimrProto::OK) {
LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with status " << ev->Get()->Status);
- Disconnected();
+ Disconnected(TEvPartitionWriter::TEvWriteResponse::InternalError);
return;
}
@@ -458,13 +460,13 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
if(*ExpectedGeneration != msg->Generation)
{
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation);
- Disconnected();
+ Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionNotLocal);
PassAway();
}
if (NActors::TActivationContext::ActorSystem()->NodeId != msg->ServerId.NodeId())
{
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong NodeId. Expected: " << NActors::TActivationContext::ActorSystem()->NodeId << ", received " << msg->ServerId.NodeId());
- Disconnected();
+ Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionNotLocal);
PassAway();
}
}
@@ -473,7 +475,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->TabletId == TabletId) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientDestroyed");
- Disconnected();
+ Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionDisconnected);
}
}
@@ -552,6 +554,8 @@ private:
TMap<ui64, NKikimrClient::TPersQueueRequest> PendingReserve;
TDeque<ui64> PendingWrite;
+ TEvPartitionWriter::TEvWriteResponse::EErrors ErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError;
+
}; // TPartitionWriter
IActor* CreatePartitionWriter(const TActorId& client, ui64 tabletId, ui32 partitionId, TMaybe<ui32> expectedGeneration, const TString& sourceId, const TPartitionWriterOpts& opts) {
diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h
index 10e1feaf15..6a1a735ab9 100644
--- a/ydb/core/persqueue/writer/writer.h
+++ b/ydb/core/persqueue/writer/writer.h
@@ -76,10 +76,19 @@ struct TEvPartitionWriter {
};
struct TEvWriteResponse: public TEventPB<TEvWriteResponse, NKikimrClient::TResponse, EvWriteResponse> {
+ enum EErrors {
+ InternalError,
+ // Partition located on other node.
+ PartitionNotLocal,
+ // Partitition restarted.
+ PartitionDisconnected
+ };
+
struct TSuccess {
};
struct TError {
+ EErrors Code;
TString Reason;
};
@@ -93,8 +102,8 @@ struct TEvPartitionWriter {
Record = std::move(response);
}
- explicit TEvWriteResponse(const TString& reason, NKikimrClient::TResponse&& response)
- : Result(TError{reason})
+ explicit TEvWriteResponse(const EErrors code, const TString& reason, NKikimrClient::TResponse&& response)
+ : Result(TError{code, reason})
{
Record = std::move(response);
}