aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-04-14 12:26:15 +0300
committerildar-khisam <ikhis@ydb.tech>2023-04-14 12:26:15 +0300
commit37e95342a6c2805bc5cc7ae59045838258f1e3bf (patch)
treeb885416567855c472ea1ceb848a6d0dc271351e3
parentb079747b8a665c7e1e9573163b558ffa7bb296f7 (diff)
downloadydb-37e95342a6c2805bc5cc7ae59045838258f1e3bf.tar.gz
specify error codes
specify error codes
-rw-r--r--ydb/public/api/protos/draft/persqueue_error_codes.proto5
-rw-r--r--ydb/public/api/protos/persqueue_error_codes_v1.proto13
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp8
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp16
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h10
-rw-r--r--ydb/services/persqueue_v1/actors/commit_offset_actor.cpp5
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.cpp8
-rw-r--r--ydb/services/persqueue_v1/actors/persqueue_utils.cpp59
-rw-r--r--ydb/services/persqueue_v1/actors/persqueue_utils.h1
-rw-r--r--ydb/services/persqueue_v1/actors/read_info_actor.cpp1
-rw-r--r--ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp2
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp2
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp4
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp8
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp16
15 files changed, 59 insertions, 99 deletions
diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto
index f637fba4682..41eb3695620 100644
--- a/ydb/public/api/protos/draft/persqueue_error_codes.proto
+++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto
@@ -25,6 +25,7 @@ enum EErrorCode {
READ_ERROR_TOO_BIG_OFFSET = 12;
SET_OFFSET_ERROR_COMMIT_TO_FUTURE = 13;
+ SET_OFFSET_ERROR_COMMIT_TO_PAST = 25;
TABLET_IS_DROPPED = 14;
@@ -40,7 +41,9 @@ enum EErrorCode {
CREATE_TIMEOUT = 22; // TODO: move to pqlib codes
IDLE_TIMEOUT = 23; // TODO: move to pqlib codes
- SET_OFFSET_ERROR_COMMIT_TO_PAST = 25;
+ PREFERRED_CLUSTER_MISMATCHED = 26;
+
+ TABLET_PIPE_DISCONNECTED = 27;
ERROR = 100;
}
diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto
index 3620c4799bd..94321dc08ad 100644
--- a/ydb/public/api/protos/persqueue_error_codes_v1.proto
+++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto
@@ -21,12 +21,14 @@ enum ErrorCode {
CREATE_SESSION_ALREADY_LOCKED = 500006;
DELETE_SESSION_NO_SESSION = 500007;
- READ_ERROR_IN_PROGRESS = 500008;
+ // removed READ_ERROR_IN_PROGRESS = 500008;
READ_ERROR_NO_SESSION = 500009;
+ // READ_TIMEOUT = 10;
READ_ERROR_TOO_SMALL_OFFSET = 500011;
READ_ERROR_TOO_BIG_OFFSET = 500012;
SET_OFFSET_ERROR_COMMIT_TO_FUTURE = 500013;
+ SET_OFFSET_ERROR_COMMIT_TO_PAST = 500025;
TABLET_IS_DROPPED = 500014;
@@ -38,10 +40,13 @@ enum ErrorCode {
CLUSTER_DISABLED = 500020;
WRONG_PARTITION_NUMBER = 500021;
- PREFERRED_CLUSTER_MISMATCHED = 500022;
- TABLET_PIPE_DISCONNECTED = 500023;
- SET_OFFSET_ERROR_COMMIT_TO_PAST = 500025;
+ // CREATE_TIMEOUT = 22;
+ // IDLE_TIMEOUT = 23;
+
+ PREFERRED_CLUSTER_MISMATCHED = 500026;
+
+ TABLET_PIPE_DISCONNECTED = 500027;
ERROR = 500100;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp
index 17bc321c251..3c46ef1605f 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp
@@ -6,12 +6,12 @@ namespace NYdb::NPersQueue {
ERetryErrorClass GetRetryErrorClass(EStatus status) {
switch (status) {
- case EStatus::SUCCESS:
- case EStatus::INTERNAL_ERROR:
+ case EStatus::SUCCESS: // NoRetry?
+ case EStatus::INTERNAL_ERROR: // NoRetry?
case EStatus::ABORTED:
case EStatus::UNAVAILABLE:
- case EStatus::GENERIC_ERROR:
- case EStatus::BAD_SESSION:
+ case EStatus::GENERIC_ERROR: // NoRetry?
+ case EStatus::BAD_SESSION: // NoRetry?
case EStatus::SESSION_EXPIRED:
case EStatus::CANCELLED:
case EStatus::UNDETERMINED:
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 6a5c3f9bc4d..86ce34c8e35 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -176,7 +176,7 @@ namespace NKikimr::NDataStreams::V1 {
{
return ReplyWithError(Ydb::StatusIds::ALREADY_EXISTS,
static_cast<size_t>(NYds::EErrorCodes::IN_USE),
- TStringBuilder() << "Stream with name " << GetProtoRequest()->stream_name() << " is already exists",
+ TStringBuilder() << "Stream with name " << GetProtoRequest()->stream_name() << " already exists",
ctx);
}
return TBase::TBase::Handle(ev, ctx);
@@ -568,13 +568,13 @@ namespace NKikimr::NDataStreams::V1 {
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
+ ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
+ ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
@@ -1471,7 +1471,7 @@ namespace NKikimr::NDataStreams::V1 {
return;
default:
return ReplyWithError(ConvertPersQueueInternalCodeToStatus(record.GetErrorCode()),
- static_cast<size_t>(NYds::EErrorCodes::ERROR),
+ ConvertOldCode(record.GetErrorCode()),
record.GetErrorReason(), ctx);
}
break;
@@ -1516,13 +1516,13 @@ namespace NKikimr::NDataStreams::V1 {
void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
+ ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
}
void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
+ ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
@@ -1798,13 +1798,13 @@ namespace NKikimr::NDataStreams::V1 {
void TListShardsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
+ ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
}
void TListShardsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
+ ReplyWithError(Ydb::StatusIds::UNAVAILABLE, Ydb::PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index 047de2a0356..91c7b5e8126 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -207,12 +207,8 @@ namespace NKikimr::NGRpcProxy::V1 {
if (ev->Get()->Request.Get()->ResultSet.size() != 1 ||
ev->Get()->Request.Get()->ResultSet.begin()->Kind !=
NSchemeCache::TSchemeCacheNavigate::KindTopic) {
- this->Request_->RaiseIssue(
- FillIssue(
- TStringBuilder() << "path '" << path << "' is not a stream",
- Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
- )
- );
+ this->Request_->RaiseIssue(FillIssue(TStringBuilder() << "path '" << path << "' is not a topic",
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
return true;
}
@@ -418,7 +414,7 @@ namespace NKikimr::NGRpcProxy::V1 {
if (status == TEvTxUserProxy::TResultStatus::ExecError && msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusPreconditionFailed)
{
return TBase::ReplyWithError(Ydb::StatusIds::OVERLOADED,
- Ydb::PersQueue::ErrorCode::ERROR,
+ Ydb::PersQueue::ErrorCode::OVERLOAD,
TStringBuilder() << "Topic with name " << TBase::GetTopicPath(ctx) << " has another alter in progress",
ctx);
}
diff --git a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
index a06c054b565..7d39c06854a 100644
--- a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
@@ -97,8 +97,6 @@ void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAc
Y_VERIFY(TopicAndTablets.size() == 1);
auto& [topic, topicInitInfo] = *TopicAndTablets.begin();
- // AnswerError("test auth ok", Ydb::PersQueue::ErrorCode::ERROR, ctx);
-
if (topicInitInfo.PartitionIdToTabletId.find(PartitionId) == topicInitInfo.PartitionIdToTabletId.end()) {
AnswerError("partition id not found in topic", PersQueue::ErrorCode::WRONG_PARTITION_NUMBER, ctx);
}
@@ -142,7 +140,8 @@ void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAc
void TCommitOffsetActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Record.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
- return AnswerError(ev->Get()->Record.GetErrorReason(), PersQueue::ErrorCode::ERROR, ctx);
+ auto errorCode = ConvertOldCode(ev->Get()->Record.GetErrorCode());
+ return AnswerError(ev->Get()->Record.GetErrorReason(), errorCode, ctx);
}
// Convert to correct response.
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp
index 402a93cf514..997ce3b4023 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp
@@ -204,7 +204,8 @@ void TPartitionActor::RestartPipe(const TActorContext& ctx, const TString& reaso
++PipeGeneration;
if (PipeGeneration == MAX_PIPE_RESTARTS) {
- ctx.Send(ParentId, new TEvPQProxy::TEvCloseSession("too much attempts to restart pipe", PersQueue::ErrorCode::ERROR));
+ // ???
+ ctx.Send(ParentId, new TEvPQProxy::TEvCloseSession("too much attempts to restart pipe", PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED));
return;
}
@@ -454,6 +455,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
if (ev->Get()->Record.GetStatus() != NKikimr::NMsgBusProxy::MSTATUS_OK) { //this is incorrect answer, die
Y_VERIFY(!ev->Get()->Record.HasErrorCode());
Counters.Errors.Inc();
+ // map NMsgBusProxy::EResponseStatus to PersQueue::ErrorCode???
ctx.Send(ParentId, new TEvPQProxy::TEvCloseSession("status is not ok: " + ev->Get()->Record.GetErrorReason(), PersQueue::ErrorCode::ERROR));
return;
}
@@ -651,13 +653,13 @@ void TPartitionActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const
<< " pipe restart attempt " << PipeGeneration << " pipe creation result: " << msg->Status);
if (msg->Status != NKikimrProto::OK) {
- RestartPipe(ctx, TStringBuilder() << "pipe to tablet is dead " << msg->TabletId, NPersQueue::NErrorCode::ERROR);
+ RestartPipe(ctx, TStringBuilder() << "pipe to tablet is dead " << msg->TabletId, NPersQueue::NErrorCode::TABLET_PIPE_DISCONNECTED);
return;
}
}
void TPartitionActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- RestartPipe(ctx, TStringBuilder() << "pipe to tablet is dead " << ev->Get()->TabletId, NPersQueue::NErrorCode::ERROR);
+ RestartPipe(ctx, TStringBuilder() << "pipe to tablet is dead " << ev->Get()->TabletId, NPersQueue::NErrorCode::TABLET_PIPE_DISCONNECTED);
}
diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
index 8ca421a3e93..459302b013b 100644
--- a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
+++ b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp
@@ -97,70 +97,26 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQ
case TABLET_PIPE_DISCONNECTED:
return Ydb::StatusIds::UNAVAILABLE;
case PREFERRED_CLUSTER_MISMATCHED:
+ case SOURCEID_DELETED:
return Ydb::StatusIds::ABORTED;
case OVERLOAD:
- return Ydb::StatusIds::OVERLOADED;
- case BAD_REQUEST:
- return Ydb::StatusIds::BAD_REQUEST;
- case WRONG_COOKIE:
- case CREATE_SESSION_ALREADY_LOCKED:
- case DELETE_SESSION_NO_SESSION:
- case READ_ERROR_NO_SESSION:
- return Ydb::StatusIds::SESSION_EXPIRED;
case WRITE_ERROR_PARTITION_IS_FULL:
case WRITE_ERROR_DISK_IS_FULL:
+ return Ydb::StatusIds::OVERLOADED;
+ case BAD_REQUEST:
case WRITE_ERROR_BAD_OFFSET:
- case SOURCEID_DELETED:
- case READ_ERROR_IN_PROGRESS:
case READ_ERROR_TOO_SMALL_OFFSET:
case READ_ERROR_TOO_BIG_OFFSET:
case SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
case SET_OFFSET_ERROR_COMMIT_TO_PAST:
- case READ_NOT_DONE:
- return Ydb::StatusIds::GENERIC_ERROR;
- case TABLET_IS_DROPPED:
- case UNKNOWN_TOPIC:
- case WRONG_PARTITION_NUMBER:
- return Ydb::StatusIds::SCHEME_ERROR;
- case ACCESS_DENIED:
- return Ydb::StatusIds::UNAUTHORIZED;
- case ERROR:
- return Ydb::StatusIds::GENERIC_ERROR;
-
- default:
- return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
- }
-}
-
-Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const NPersQueue::NErrorCode::EErrorCode code)
-{
- using namespace NPersQueue::NErrorCode;
-
- switch(code) {
- case OK :
- return Ydb::StatusIds::SUCCESS;
- case INITIALIZING:
- case CLUSTER_DISABLED:
- return Ydb::StatusIds::UNAVAILABLE;
- case OVERLOAD:
- return Ydb::StatusIds::OVERLOADED;
- case BAD_REQUEST:
return Ydb::StatusIds::BAD_REQUEST;
case WRONG_COOKIE:
case CREATE_SESSION_ALREADY_LOCKED:
case DELETE_SESSION_NO_SESSION:
case READ_ERROR_NO_SESSION:
return Ydb::StatusIds::SESSION_EXPIRED;
- case WRITE_ERROR_PARTITION_IS_FULL:
- case WRITE_ERROR_DISK_IS_FULL:
- case WRITE_ERROR_BAD_OFFSET:
- case SOURCEID_DELETED:
- case READ_ERROR_IN_PROGRESS:
- case READ_ERROR_TOO_SMALL_OFFSET:
- case READ_ERROR_TOO_BIG_OFFSET:
- case SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
case READ_NOT_DONE:
- return Ydb::StatusIds::GENERIC_ERROR;
+ return Ydb::StatusIds::INTERNAL_ERROR;
case TABLET_IS_DROPPED:
case UNKNOWN_TOPIC:
case WRONG_PARTITION_NUMBER:
@@ -168,13 +124,18 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const NPersQueue
case ACCESS_DENIED:
return Ydb::StatusIds::UNAUTHORIZED;
case ERROR:
- return Ydb::StatusIds::GENERIC_ERROR;
+ return Ydb::StatusIds::UNSUPPORTED;
default:
return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
}
}
+Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const NPersQueue::NErrorCode::EErrorCode code)
+{
+ return ConvertPersQueueInternalCodeToStatus(ConvertOldCode(code));
+}
+
Ydb::PersQueue::ErrorCode::ErrorCode ConvertOldCode(const NPersQueue::NErrorCode::EErrorCode code)
{
if (code == NPersQueue::NErrorCode::OK)
diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.h b/ydb/services/persqueue_v1/actors/persqueue_utils.h
index 56a03bff148..7d8246bfddc 100644
--- a/ydb/services/persqueue_v1/actors/persqueue_utils.h
+++ b/ydb/services/persqueue_v1/actors/persqueue_utils.h
@@ -54,6 +54,7 @@ Ydb::PersQueue::ErrorCode::ErrorCode ConvertOldCode(const NPersQueue::NErrorCode
#pragma clang diagnostic ignored "-Wunused-function"
static inline bool InternalErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode errorCode) {
switch(errorCode) {
+ // TODO: check list
case Ydb::PersQueue::ErrorCode::UNKNOWN_TOPIC:
case Ydb::PersQueue::ErrorCode::ERROR:
case Ydb::PersQueue::ErrorCode::INITIALIZING:
diff --git a/ydb/services/persqueue_v1/actors/read_info_actor.cpp b/ydb/services/persqueue_v1/actors/read_info_actor.cpp
index 6cfe69b348a..5db1bdc512a 100644
--- a/ydb/services/persqueue_v1/actors/read_info_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_info_actor.cpp
@@ -113,6 +113,7 @@ void TReadInfoActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorC
void TReadInfoActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Record.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
+ // map NMsgBusProxy::EResponseStatus to PersQueue::ErrorCode???
return AnswerError(ev->Get()->Record.GetErrorReason(), PersQueue::ErrorCode::ERROR, ctx);
}
diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
index d864f00f1f9..cd1ce0d3084 100644
--- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
@@ -233,7 +233,7 @@ void TReadInitAndAuthActor::HandleClientSchemeCacheResponse(
auto path = "/" + JoinPath(entry.Path); // ToDo [migration] - through converter ?
if (navigate->ErrorCount > 0) {
const NSchemeCache::TSchemeCacheNavigate::EStatus status = navigate->ResultSet.front().Status;
- CloseSession(TStringBuilder() << "Failed to read ACL for '" << path << "' Scheme cache error : " << status, PersQueue::ErrorCode::ERROR, ctx);
+ CloseSession(TStringBuilder() << "Failed to read ACL for '" << path << "' Scheme cache error : " << status, PersQueue::ErrorCode::UNKNOWN_TOPIC, ctx);
return;
}
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 6731ea50b4e..3af30877721 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -1296,7 +1296,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientCon
}
// TODO: remove it
- return CloseSession(PersQueue::ErrorCode::ERROR, TStringBuilder()
+ return CloseSession(PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, TStringBuilder()
<< "unable to connect to one of topics, tablet " << msg->TabletId, ctx);
#if 0
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 06010d3477a..4fafb16b21f 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -967,7 +967,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
if (GetProtoRequest()->include_stats()) {
if (Consumer && !found) {
- Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::ERROR));
+ Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::BAD_REQUEST));
return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx);
}
@@ -1020,7 +1020,7 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::
break;
}
if (!found) {
- Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::ERROR));
+ Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::BAD_REQUEST));
return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx);
}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 01ddba32d64..1d1104ca992 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -713,7 +713,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvCreateSes
TStringBuilder errorReason;
errorReason << "kqp error Marker# PQ53 : " << record;
- CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx);
+ CloseSession(errorReason, PersQueue::ErrorCode::INITIALIZING, ctx);
return;
}
@@ -800,7 +800,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp
LOG_WARN_S(ctx, NKikimrServices::PQ_WRITE_PROXY, errorReason);
SourceIdUpdatesInflight--;
} else {
- CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx);
+ CloseSession(errorReason, PersQueue::ErrorCode::INITIALIZING, ctx);
}
return;
}
@@ -1196,7 +1196,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().sequence_numbers_size(); messageIndex != endIndex; ++messageIndex) {
if (partitionCmdWriteResultIndex == resp.CmdWriteResultSize()) {
- CloseSession("too less responses from server", PersQueue::ErrorCode::ERROR, ctx);
+ CloseSession("too few responses from server", PersQueue::ErrorCode::ERROR, ctx);
return;
}
const auto& partitionCmdWriteResult = resp.GetCmdWriteResult(partitionCmdWriteResultIndex);
@@ -1216,7 +1216,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().messages_size(); messageIndex != endIndex; ++messageIndex) {
if (partitionCmdWriteResultIndex == resp.CmdWriteResultSize()) {
- CloseSession("too less responses from server", PersQueue::ErrorCode::ERROR, ctx);
+ CloseSession("too few responses from server", PersQueue::ErrorCode::ERROR, ctx);
return;
}
const auto& partitionCmdWriteResult = resp.GetCmdWriteResult(partitionCmdWriteResultIndex);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 6a8528bd2e4..5e9d2a6cc89 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -1023,9 +1023,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << resp << "\n";
UNIT_ASSERT(status.ok());
- UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR);
- // TODO: change to BAD_REQUEST
- // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
}
// commit to past - expect bad request
@@ -1043,9 +1041,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << resp << "\n";
UNIT_ASSERT(status.ok());
- UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR);
- // TODO: change to BAD_REQUEST
- // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
}
// commit to valid offset - expect successful commit
@@ -1100,9 +1096,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << resp << "\n";
UNIT_ASSERT(status.ok());
- UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR);
- // TODO: change to BAD_REQUEST
- // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
}
// commit to future - expect bad request
@@ -1120,9 +1114,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << resp << "\n";
UNIT_ASSERT(status.ok());
- UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::GENERIC_ERROR);
- // TODO: change to BAD_REQUEST
- // UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+ UNIT_ASSERT_VALUES_EQUAL(resp.operation().status(), Ydb::StatusIds::BAD_REQUEST);
}
}