diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-18 11:53:00 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-18 16:15:34 +0300 |
commit | f2318846c79045407d30ab0c1dbb44e1fcb749e0 (patch) | |
tree | d33e78ec6cebdef9ac6fc1a82fa9fd4ef9156170 | |
parent | b4f93a19b70aa5276a2ae7e17fc82c37be6c207e (diff) | |
download | ydb-f2318846c79045407d30ab0c1dbb44e1fcb749e0.tar.gz |
More accurate error codes
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/writer/writer.cpp | 44 | ||||
-rw-r--r-- | ydb/core/persqueue/writer/writer.h | 13 |
3 files changed, 46 insertions, 23 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index c00421e778..908446b032 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -417,6 +417,16 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque } } +EKafkaErrors Convert(TEvPartitionWriter::TEvWriteResponse::EErrors value) { + switch(value) { + case TEvPartitionWriter::TEvWriteResponse::EErrors::PartitionDisconnected: + case TEvPartitionWriter::TEvWriteResponse::EErrors::PartitionNotLocal: + return EKafkaErrors::NOT_LEADER_OR_FOLLOWER; + default: + return EKafkaErrors::UNKNOWN_SERVER_ERROR; + } +} + void TKafkaProduceActor::SendResults(const TActorContext& ctx) { auto expireTime = ctx.Now() - REQUEST_EXPIRATION_INTERVAL; @@ -474,7 +484,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { partitionResponse.BaseOffset = lastResult.GetSeqNo(); } } else { - partitionResponse.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR; + partitionResponse.ErrorCode = Convert(msg->GetError().Code); partitionResponse.ErrorMessage = msg->GetError().Reason; } } diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 6c52faf1aa..1f3beeb1cf 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -122,20 +122,22 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { return response; } - void BecomeZombie(const TString& error) { + void BecomeZombie(const TEvPartitionWriter::TEvWriteResponse::EErrors errorCode, const TString& error) { + ErrorCode = errorCode; + SendError(error); Become(&TThis::StateZombie); } void SendError(const TString& error) { for (auto cookie : std::exchange(PendingWrite, {})) { - SendWriteResult(error, MakeResponse(cookie)); + SendWriteResult(ErrorCode, error, MakeResponse(cookie)); } for (const auto& [cookie, _] : std::exchange(PendingReserve, {})) { - SendWriteResult(error, MakeResponse(cookie)); + SendWriteResult(ErrorCode, error, MakeResponse(cookie)); } for (const auto& [cookie, _] : std::exchange(Pending, {})) { - SendWriteResult(error, MakeResponse(cookie)); + SendWriteResult(ErrorCode, error, MakeResponse(cookie)); } } @@ -146,7 +148,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { void InitResult(const TString& reason, NKikimrClient::TResponse&& response) { SendInitResult(reason, std::move(response)); - BecomeZombie("Init error"); + BecomeZombie(TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError, "Init error"); } void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo) { @@ -158,9 +160,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { Send(Client, new TEvPartitionWriter::TEvWriteResponse(std::forward<Args>(args)...)); } - void WriteResult(const TString& reason, NKikimrClient::TResponse&& response) { - SendWriteResult(reason, std::move(response)); - BecomeZombie("Write error"); + void WriteResult(TEvPartitionWriter::TEvWriteResponse::EErrors errorCode, const TString& reason, NKikimrClient::TResponse&& response) { + SendWriteResult(errorCode, reason, std::move(response)); + BecomeZombie(errorCode, "Write error"); } void WriteResult(NKikimrClient::TResponse&& response) { @@ -172,9 +174,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { Send(Client, new TEvPartitionWriter::TEvWriteAccepted(cookie)); } - void Disconnected() { + void Disconnected(TEvPartitionWriter::TEvWriteResponse::EErrors errorCode) { Send(Client, new TEvPartitionWriter::TEvDisconnected()); - BecomeZombie("Disconnected"); + BecomeZombie(errorCode, "Disconnected"); } /// GetOwnership @@ -337,7 +339,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { void Reject(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) { const auto cookie = ev->Get()->Record.GetPartitionRequest().GetCookie(); - return WriteResult("Rejected by writer", MakeResponse(cookie)); + return WriteResult(ErrorCode, "Rejected by writer", MakeResponse(cookie)); } void HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) { @@ -404,13 +406,13 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { TString error; if (!BasicCheck(record, error)) { - return WriteResult(error, std::move(record)); + return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, error, std::move(record)); } const auto& response = record.GetPartitionResponse(); if (!response.CmdWriteResultSize()) { if (PendingReserve.empty()) { - return WriteResult("Unexpected ReserveBytes response", std::move(record)); + return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, "Unexpected ReserveBytes response", std::move(record)); } const auto cookie = PendingReserve.begin()->first; @@ -418,14 +420,14 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { error = TStringBuilder() << "Unexpected cookie at ReserveBytes" << ": expected# " << cookie << ", got# " << response.GetCookie(); - return WriteResult(error, std::move(record)); + return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, error, std::move(record)); } WriteAccepted(cookie); Write(cookie); } else { if (PendingWrite.empty()) { - return WriteResult("Unexpected Write response", std::move(record)); + return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, "Unexpected Write response", std::move(record)); } const auto cookie = PendingWrite.front(); @@ -433,7 +435,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { error = TStringBuilder() << "Unexpected cookie at Write" << ": expected# " << cookie << ", got# " << response.GetCookie(); - return WriteResult(error, std::move(record)); + return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, error, std::move(record)); } WriteResult(std::move(record)); @@ -447,7 +449,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { if (msg->Status != NKikimrProto::OK) { LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with status " << ev->Get()->Status); - Disconnected(); + Disconnected(TEvPartitionWriter::TEvWriteResponse::InternalError); return; } @@ -458,13 +460,13 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { if(*ExpectedGeneration != msg->Generation) { LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation); - Disconnected(); + Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionNotLocal); PassAway(); } if (NActors::TActivationContext::ActorSystem()->NodeId != msg->ServerId.NodeId()) { LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong NodeId. Expected: " << NActors::TActivationContext::ActorSystem()->NodeId << ", received " << msg->ServerId.NodeId()); - Disconnected(); + Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionNotLocal); PassAway(); } } @@ -473,7 +475,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { if (ev->Get()->TabletId == TabletId) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientDestroyed"); - Disconnected(); + Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionDisconnected); } } @@ -552,6 +554,8 @@ private: TMap<ui64, NKikimrClient::TPersQueueRequest> PendingReserve; TDeque<ui64> PendingWrite; + TEvPartitionWriter::TEvWriteResponse::EErrors ErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError; + }; // TPartitionWriter IActor* CreatePartitionWriter(const TActorId& client, ui64 tabletId, ui32 partitionId, TMaybe<ui32> expectedGeneration, const TString& sourceId, const TPartitionWriterOpts& opts) { diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index 10e1feaf15..6a1a735ab9 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -76,10 +76,19 @@ struct TEvPartitionWriter { }; struct TEvWriteResponse: public TEventPB<TEvWriteResponse, NKikimrClient::TResponse, EvWriteResponse> { + enum EErrors { + InternalError, + // Partition located on other node. + PartitionNotLocal, + // Partitition restarted. + PartitionDisconnected + }; + struct TSuccess { }; struct TError { + EErrors Code; TString Reason; }; @@ -93,8 +102,8 @@ struct TEvPartitionWriter { Record = std::move(response); } - explicit TEvWriteResponse(const TString& reason, NKikimrClient::TResponse&& response) - : Result(TError{reason}) + explicit TEvWriteResponse(const EErrors code, const TString& reason, NKikimrClient::TResponse&& response) + : Result(TError{code, reason}) { Record = std::move(response); } |