diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2024-01-18 10:58:03 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-18 10:58:03 +0100 |
commit | fa40ee8c90448fa7bab0264d40012edd8f117b29 (patch) | |
tree | 8158a560345bca5dd8205f068fe03d2594db6ab4 | |
parent | 0a965144219a01dadef957ee280b78147d0667f7 (diff) | |
download | ydb-fa40ee8c90448fa7bab0264d40012edd8f117b29.tar.gz |
Fix possible node crash in case of unavaliable response (#1107)
during refresh token process.
The GrpcRequestProxy Unavaliable method should not be used from
rpc actors. The interface will be isolated in a future.
3 files changed, 9 insertions, 3 deletions
diff --git a/ydb/services/persqueue_v1/actors/direct_read_actor.cpp b/ydb/services/persqueue_v1/actors/direct_read_actor.cpp index 0e1fcf0414..883e966ab0 100644 --- a/ydb/services/persqueue_v1/actors/direct_read_actor.cpp +++ b/ydb/services/persqueue_v1/actors/direct_read_actor.cpp @@ -364,7 +364,9 @@ void TDirectReadSessionActor::Handle(NGRpcService::TGRpcRequestProxy::TEvRefresh WriteToStreamOrDie(ctx, std::move(result)); } else { if (ev->Get()->Retryable) { - Request->ReplyUnavaliable(); + TServerMessage serverMessage; + serverMessage.set_status(Ydb::StatusIds::UNAVAILABLE); + Request->GetStreamCtx()->WriteAndFinish(std::move(serverMessage), grpc::Status::OK); } else { Request->ReplyUnauthenticated("refreshed token is invalid"); } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index e624997c83..b124bbd230 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -1627,7 +1627,9 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequestP } } else { if (ev->Get()->Retryable) { - Request->ReplyUnavaliable(); + TServerMessage serverMessage; + serverMessage.set_status(Ydb::StatusIds::UNAVAILABLE); + Request->GetStreamCtx()->WriteAndFinish(std::move(serverMessage), grpc::Status::OK); } else { Request->ReplyUnauthenticated("refreshed token is invalid"); } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index facb42ce47..bee353798f 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -1278,7 +1278,9 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequest } } else { if (ev->Get()->Retryable) { - Request->ReplyUnavaliable(); + TServerMessage serverMessage; + serverMessage.set_status(Ydb::StatusIds::UNAVAILABLE); + Request->GetStreamCtx()->WriteAndFinish(std::move(serverMessage), grpc::Status::OK); } else { Request->ReplyUnauthenticated("refreshed token is invalid"); } |