aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@yandex-team.ru>2022-03-24 22:41:55 +0300
committermokhotskii <mokhotskii@yandex-team.ru>2022-03-24 22:41:55 +0300
commit3d072ebbe1c2292e8eb35427050bf960f3f1a8d9 (patch)
treefb3d832e047c2552959a8e772cb29b291e5f4d87
parent336c4b71b8c800813fb3fc246335ce71bab6d170 (diff)
downloadydb-3d072ebbe1c2292e8eb35427050bf960f3f1a8d9.tar.gz
LOGBROKER-7298 Fix verify on GetRecords call
Fix VERIFY issue ref:8a203cd72d8e89bce0e0859479ae144bf8576aef
-rw-r--r--ydb/core/persqueue/partition.cpp24
-rw-r--r--ydb/core/persqueue/pq_impl.cpp8
-rw-r--r--ydb/services/datastreams/CMakeLists.txt1
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp56
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp16
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_actor.h2
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp57
-rw-r--r--ydb/services/persqueue_v1/persqueue_utils.cpp88
-rw-r--r--ydb/services/persqueue_v1/persqueue_utils.h3
9 files changed, 182 insertions, 73 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index af28a8f8e2..127f7ddf5e 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2617,19 +2617,29 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) {
Counters.Cumulative()[COUNTER_PQ_READ_ERROR_SMALL_OFFSET].Increment(1);
read->Offset = StartOffset;
if (read->PartNo > 0) {
- LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "I was right, there could be rewinds and deletions at once! Topic " << TopicName << " partition " << Partition
- << " readOffset " << read->Offset << " readPartNo " << read->PartNo << " startOffset " << StartOffset);
- ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_TOO_SMALL_OFFSET, "client requested not from first part, and this part is lost");
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
+ "I was right, there could be rewinds and deletions at once! Topic " << TopicName <<
+ " partition " << Partition <<
+ " readOffset " << read->Offset <<
+ " readPartNo " << read->PartNo <<
+ " startOffset " << StartOffset);
+ ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_TOO_SMALL_OFFSET,
+ "client requested not from first part, and this part is lost");
return;
}
}
if (read->Offset > EndOffset || read->Offset == EndOffset && read->PartNo > 0) {
Counters.Cumulative()[COUNTER_PQ_READ_ERROR_BIG_OFFSET].Increment(1);
Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0);
- LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "reading from too big offset - topic " << TopicName << " partition " << Partition << " client "
- << read->ClientId << " EndOffset " << EndOffset << " offset " << read->Offset);
- ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_TOO_BIG_OFFSET,
- TStringBuilder() << "trying to read from future. ReadOffset " << read->Offset << ", " << read->PartNo << " EndOffset " << EndOffset);
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
+ "reading from too big offset - topic " << TopicName <<
+ " partition " << Partition <<
+ " client " << read->ClientId <<
+ " EndOffset " << EndOffset <<
+ " offset " << read->Offset);
+ ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_TOO_BIG_OFFSET,
+ TStringBuilder() << "trying to read from future. ReadOffset " <<
+ read->Offset << ", " << read->PartNo << " EndOffset " << EndOffset);
return;
}
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index e3d68aea43..bdec76536e 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -313,8 +313,12 @@ public:
bool HandleError(TEvPQ::TEvError *ev, const TActorContext& ctx)
{
- LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, "Answer error topic: '" << TopicName << "' partition: " << Partition
- << " messageNo: " << MessageNo << " requestId: " << ReqId << " error: " << ev->Error);
+ LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE,
+ "Answer error topic: '" << TopicName << "'" <<
+ " partition: " << Partition <<
+ " messageNo: " << MessageNo <<
+ " requestId: " << ReqId <<
+ " error: " << ev->Error);
Response->Record.SetStatus(NMsgBusProxy::MSTATUS_ERROR);
Response->Record.SetErrorCode(ev->ErrorCode);
Response->Record.SetErrorReason(ev->Error);
diff --git a/ydb/services/datastreams/CMakeLists.txt b/ydb/services/datastreams/CMakeLists.txt
index 05d672dbdd..5cbec71234 100644
--- a/ydb/services/datastreams/CMakeLists.txt
+++ b/ydb/services/datastreams/CMakeLists.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-services-datastreams PUBLIC
cpp-client-ydb_datastreams
services-lib-actors
services-lib-sharding
+ ydb-services-persqueue_v1
ydb-services-ydb
)
target_sources(ydb-services-datastreams PRIVATE
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index ba5ee058f0..9987c8d4c0 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -12,6 +12,7 @@
#include <ydb/services/lib/actors/pq_schema_actor.h>
#include <ydb/services/lib/sharding/sharding.h>
+#include <ydb/services/persqueue_v1/persqueue_utils.h>
#include <util/folder/path.h>
@@ -438,13 +439,13 @@ namespace NKikimr::NDataStreams::V1 {
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
- ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
@@ -1141,6 +1142,9 @@ namespace NKikimr::NDataStreams::V1 {
void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
+ void Die(const TActorContext& ctx) override;
private:
void SendReadRequest(const TActorContext& ctx);
@@ -1153,6 +1157,7 @@ namespace NKikimr::NDataStreams::V1 {
ui64 TabletId;
i32 Limit;
TActorId NewSchemeCache;
+ TActorId PipeClient;
};
TGetRecordsActor::TGetRecordsActor(TEvDataStreamsGetRecordsRequest* request,
@@ -1195,7 +1200,9 @@ namespace NKikimr::NDataStreams::V1 {
.BackoffMultiplier = 2,
.DoFirstRetryInstantly = true
};
- auto PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, TabletId, clientConfig));
+ PipeClient = ctx.RegisterWithSameMailbox(
+ NTabletPipe::CreateClient(ctx.SelfID, TabletId, clientConfig)
+ );
NKikimrClient::TPersQueueRequest request;
request.MutablePartitionRequest()->SetTopic(this->GetTopicPath(ctx));
@@ -1218,6 +1225,8 @@ namespace NKikimr::NDataStreams::V1 {
void TGetRecordsActor::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvPersQueue::TEvResponse, Handle);
+ HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ HFunc(TEvTabletPipe::TEvClientConnected, Handle);
default: TBase::StateWork(ev, ctx);
}
}
@@ -1241,7 +1250,6 @@ namespace NKikimr::NDataStreams::V1 {
}
}
-
if (response.Self->Info.GetPathType() == NKikimrSchemeOp::EPathTypePersQueueGroup) {
const auto& partitions = response.PQGroupInfo->Description.GetPartitions();
for (auto& partition : partitions) {
@@ -1259,7 +1267,20 @@ namespace NKikimr::NDataStreams::V1 {
void TGetRecordsActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;
- Y_VERIFY(ev->Get()->Record.HasPartitionResponse());
+ switch (record.GetStatus()) {
+ case NMsgBusProxy::MSTATUS_ERROR:
+ switch (record.GetErrorCode()) {
+ case NPersQueue::NErrorCode::READ_ERROR_TOO_SMALL_OFFSET:
+ case NPersQueue::NErrorCode::READ_ERROR_TOO_BIG_OFFSET:
+ return SendResponse(ctx, {}, 0);
+ default:
+ return ReplyWithError(ConvertPersQueueInternalCodeToStatus(record.GetErrorCode()),
+ Ydb::PersQueue::ErrorCode::ERROR,
+ record.GetErrorReason(), ctx);
+ }
+ break;
+ default: {}
+ }
ui64 millisBehindLatestMs = 0;
std::vector<Ydb::DataStreams::V1::Record> records;
@@ -1277,12 +1298,26 @@ namespace NKikimr::NDataStreams::V1 {
record.set_sequence_number(std::to_string(r.GetOffset()).c_str());
records.push_back(record);
}
- millisBehindLatestMs = records.size() > 0 ? TInstant::Now().MilliSeconds() - results.rbegin()->GetWriteTimestampMS() : 0;
+ millisBehindLatestMs = records.size() > 0
+ ? TInstant::Now().MilliSeconds() - results.rbegin()->GetWriteTimestampMS()
+ : 0;
}
SendResponse(ctx, records, millisBehindLatestMs);
}
+ void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
+ if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
+ }
+ }
+
+ void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
+ }
+
void TGetRecordsActor::SendResponse(const TActorContext& ctx,
const std::vector<Ydb::DataStreams::V1::Record>& records,
ui64 millisBehindLatestMs) {
@@ -1307,6 +1342,11 @@ namespace NKikimr::NDataStreams::V1 {
Die(ctx);
}
+ void TGetRecordsActor::Die(const TActorContext& ctx) {
+ NTabletPipe::CloseClient(ctx, PipeClient);
+ TBase::Die(ctx);
+ }
+
//-----------------------------------------------------------------------------------------
class TListShardsActor : public TPQGrpcSchemaBase<TListShardsActor, NKikimr::NGRpcService::TEvDataStreamsListShardsRequest> {
@@ -1533,13 +1573,13 @@ namespace NKikimr::NDataStreams::V1 {
void TListShardsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
- ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
}
void TListShardsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index 396363e1e5..27847e1ace 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -1265,6 +1265,22 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), recordsCount - 3);
}
+ {
+ auto result = testServer.DataStreamsClient->GetShardIterator(streamName, "shard-000000",
+ YDS_V1::ShardIteratorType::AFTER_SEQUENCE_NUMBER,
+ NYDS_V1::TGetShardIteratorSettings().StartingSequenceNumber("99999")
+ ).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ shardIterator = result.GetResult().shard_iterator();
+ }
+
+ {
+ auto result = testServer.DataStreamsClient->GetRecords(shardIterator).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), 0);
+ }
{
auto result = testServer.DataStreamsClient->GetShardIterator(streamName, "shard-000000",
diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h
index 0e86e0fd7f..afc4d8f69e 100644
--- a/ydb/services/persqueue_v1/grpc_pq_actor.h
+++ b/ydb/services/persqueue_v1/grpc_pq_actor.h
@@ -46,8 +46,6 @@ PersQueue::ErrorCode::ErrorCode ConvertOldCode(const NPersQueue::NErrorCode::EEr
void FillIssue(Ydb::Issue::IssueMessage* issue, const PersQueue::ErrorCode::ErrorCode errorCode, const TString& errorReason);
-Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const PersQueue::ErrorCode::ErrorCode code);
-
const TString& TopicPrefix(const TActorContext& ctx);
static const TDuration CHECK_ACL_DELAY = TDuration::Minutes(5);
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 3e46415a89..9f50b6511b 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -29,8 +29,7 @@ namespace NKikimr {
using namespace NMsgBusProxy;
-namespace NGRpcProxy {
-namespace V1 {
+namespace NGRpcProxy::V1 {
using namespace PersQueue::V1;
@@ -3015,55 +3014,5 @@ void TReadInfoActor::Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorC
AnswerError(ev->Get()->Reason, ev->Get()->ErrorCode, ctx);
}
-
-
-Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const PersQueue::ErrorCode::ErrorCode code) {
-
- using namespace PersQueue::ErrorCode;
-
- switch(code) {
- case OK :
- return Ydb::StatusIds::SUCCESS;
- case INITIALIZING:
- case CLUSTER_DISABLED:
- return Ydb::StatusIds::UNAVAILABLE;
- case PREFERRED_CLUSTER_MISMATCHED:
- return Ydb::StatusIds::ABORTED;
- case OVERLOAD:
- return Ydb::StatusIds::OVERLOADED;
- case BAD_REQUEST:
- return Ydb::StatusIds::BAD_REQUEST;
- case WRONG_COOKIE:
- case CREATE_SESSION_ALREADY_LOCKED:
- case DELETE_SESSION_NO_SESSION:
- case READ_ERROR_NO_SESSION:
- return Ydb::StatusIds::SESSION_EXPIRED;
- case WRITE_ERROR_PARTITION_IS_FULL:
- case WRITE_ERROR_DISK_IS_FULL:
- case WRITE_ERROR_BAD_OFFSET:
- case SOURCEID_DELETED:
- case READ_ERROR_IN_PROGRESS:
- case READ_ERROR_TOO_SMALL_OFFSET:
- case READ_ERROR_TOO_BIG_OFFSET:
- case SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
- case READ_NOT_DONE:
- return Ydb::StatusIds::GENERIC_ERROR;
- case TABLET_IS_DROPPED:
- case UNKNOWN_TOPIC:
- case WRONG_PARTITION_NUMBER:
- return Ydb::StatusIds::SCHEME_ERROR;
- case ACCESS_DENIED:
- return Ydb::StatusIds::UNAUTHORIZED;
- case ERROR:
- return Ydb::StatusIds::GENERIC_ERROR;
-
- default:
- return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
- }
-}
-
-
-
-}
-}
-}
+} // namespace NGRpcProxy::V1
+} // namespace NKikimr
diff --git a/ydb/services/persqueue_v1/persqueue_utils.cpp b/ydb/services/persqueue_v1/persqueue_utils.cpp
index 51d7f38045..3b7654d01c 100644
--- a/ydb/services/persqueue_v1/persqueue_utils.cpp
+++ b/ydb/services/persqueue_v1/persqueue_utils.cpp
@@ -83,4 +83,92 @@ TProcessingResult ProcessMetaCacheTopicResponse(const TSchemeCacheNavigate::TEnt
return {};
}
+Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQueue::ErrorCode::ErrorCode code) {
+
+ using namespace Ydb::PersQueue::ErrorCode;
+
+ switch(code) {
+ case OK :
+ return Ydb::StatusIds::SUCCESS;
+ case INITIALIZING:
+ case CLUSTER_DISABLED:
+ return Ydb::StatusIds::UNAVAILABLE;
+ case PREFERRED_CLUSTER_MISMATCHED:
+ return Ydb::StatusIds::ABORTED;
+ case OVERLOAD:
+ return Ydb::StatusIds::OVERLOADED;
+ case BAD_REQUEST:
+ return Ydb::StatusIds::BAD_REQUEST;
+ case WRONG_COOKIE:
+ case CREATE_SESSION_ALREADY_LOCKED:
+ case DELETE_SESSION_NO_SESSION:
+ case READ_ERROR_NO_SESSION:
+ return Ydb::StatusIds::SESSION_EXPIRED;
+ case WRITE_ERROR_PARTITION_IS_FULL:
+ case WRITE_ERROR_DISK_IS_FULL:
+ case WRITE_ERROR_BAD_OFFSET:
+ case SOURCEID_DELETED:
+ case READ_ERROR_IN_PROGRESS:
+ case READ_ERROR_TOO_SMALL_OFFSET:
+ case READ_ERROR_TOO_BIG_OFFSET:
+ case SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
+ case READ_NOT_DONE:
+ return Ydb::StatusIds::GENERIC_ERROR;
+ case TABLET_IS_DROPPED:
+ case UNKNOWN_TOPIC:
+ case WRONG_PARTITION_NUMBER:
+ return Ydb::StatusIds::SCHEME_ERROR;
+ case ACCESS_DENIED:
+ return Ydb::StatusIds::UNAUTHORIZED;
+ case ERROR:
+ return Ydb::StatusIds::GENERIC_ERROR;
+
+ default:
+ return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+ }
+}
+
+Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const NPersQueue::NErrorCode::EErrorCode code)
+{
+ using namespace NPersQueue::NErrorCode;
+
+ switch(code) {
+ case OK :
+ return Ydb::StatusIds::SUCCESS;
+ case INITIALIZING:
+ case CLUSTER_DISABLED:
+ return Ydb::StatusIds::UNAVAILABLE;
+ case OVERLOAD:
+ return Ydb::StatusIds::OVERLOADED;
+ case BAD_REQUEST:
+ return Ydb::StatusIds::BAD_REQUEST;
+ case WRONG_COOKIE:
+ case CREATE_SESSION_ALREADY_LOCKED:
+ case DELETE_SESSION_NO_SESSION:
+ case READ_ERROR_NO_SESSION:
+ return Ydb::StatusIds::SESSION_EXPIRED;
+ case WRITE_ERROR_PARTITION_IS_FULL:
+ case WRITE_ERROR_DISK_IS_FULL:
+ case WRITE_ERROR_BAD_OFFSET:
+ case SOURCEID_DELETED:
+ case READ_ERROR_IN_PROGRESS:
+ case READ_ERROR_TOO_SMALL_OFFSET:
+ case READ_ERROR_TOO_BIG_OFFSET:
+ case SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
+ case READ_NOT_DONE:
+ return Ydb::StatusIds::GENERIC_ERROR;
+ case TABLET_IS_DROPPED:
+ case UNKNOWN_TOPIC:
+ case WRONG_PARTITION_NUMBER:
+ return Ydb::StatusIds::SCHEME_ERROR;
+ case ACCESS_DENIED:
+ return Ydb::StatusIds::UNAUTHORIZED;
+ case ERROR:
+ return Ydb::StatusIds::GENERIC_ERROR;
+
+ default:
+ return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+ }
+}
+
} // namespace NKikimr::NGRpcProxy::V1
diff --git a/ydb/services/persqueue_v1/persqueue_utils.h b/ydb/services/persqueue_v1/persqueue_utils.h
index 7860067982..b4ffa5b857 100644
--- a/ydb/services/persqueue_v1/persqueue_utils.h
+++ b/ydb/services/persqueue_v1/persqueue_utils.h
@@ -29,4 +29,7 @@ struct TProcessingResult {
TProcessingResult ProcessMetaCacheTopicResponse(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry);
+Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQueue::ErrorCode::ErrorCode code);
+Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const NPersQueue::NErrorCode::EErrorCode code);
+
} //namespace NKikimr::NGRpcProxy::V1