aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2024-01-18 10:58:03 +0100
committerGitHub <noreply@github.com>2024-01-18 10:58:03 +0100
commitfa40ee8c90448fa7bab0264d40012edd8f117b29 (patch)
tree8158a560345bca5dd8205f068fe03d2594db6ab4
parent0a965144219a01dadef957ee280b78147d0667f7 (diff)
downloadydb-fa40ee8c90448fa7bab0264d40012edd8f117b29.tar.gz
Fix possible node crash in case of unavaliable response (#1107)
during refresh token process. The GrpcRequestProxy Unavaliable method should not be used from rpc actors. The interface will be isolated in a future.
-rw-r--r--ydb/services/persqueue_v1/actors/direct_read_actor.cpp4
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp4
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp4
3 files changed, 9 insertions, 3 deletions
diff --git a/ydb/services/persqueue_v1/actors/direct_read_actor.cpp b/ydb/services/persqueue_v1/actors/direct_read_actor.cpp
index 0e1fcf0414..883e966ab0 100644
--- a/ydb/services/persqueue_v1/actors/direct_read_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/direct_read_actor.cpp
@@ -364,7 +364,9 @@ void TDirectReadSessionActor::Handle(NGRpcService::TGRpcRequestProxy::TEvRefresh
WriteToStreamOrDie(ctx, std::move(result));
} else {
if (ev->Get()->Retryable) {
- Request->ReplyUnavaliable();
+ TServerMessage serverMessage;
+ serverMessage.set_status(Ydb::StatusIds::UNAVAILABLE);
+ Request->GetStreamCtx()->WriteAndFinish(std::move(serverMessage), grpc::Status::OK);
} else {
Request->ReplyUnauthenticated("refreshed token is invalid");
}
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index e624997c83..b124bbd230 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -1627,7 +1627,9 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequestP
}
} else {
if (ev->Get()->Retryable) {
- Request->ReplyUnavaliable();
+ TServerMessage serverMessage;
+ serverMessage.set_status(Ydb::StatusIds::UNAVAILABLE);
+ Request->GetStreamCtx()->WriteAndFinish(std::move(serverMessage), grpc::Status::OK);
} else {
Request->ReplyUnauthenticated("refreshed token is invalid");
}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index facb42ce47..bee353798f 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -1278,7 +1278,9 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequest
}
} else {
if (ev->Get()->Retryable) {
- Request->ReplyUnavaliable();
+ TServerMessage serverMessage;
+ serverMessage.set_status(Ydb::StatusIds::UNAVAILABLE);
+ Request->GetStreamCtx()->WriteAndFinish(std::move(serverMessage), grpc::Status::OK);
} else {
Request->ReplyUnauthenticated("refreshed token is invalid");
}