aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-07-19 22:38:50 +0300
committerdcherednik <dcherednik@ydb.tech>2023-07-19 22:38:50 +0300
commitdeace08df3d1ae65cd93987981f35f5cbe40b9d1 (patch)
tree57fa1313d62cf934752c6d6fa2f8bf84c1f6a7e9 /library
parenta557863565c89afa62a4b32caf33985f983bcddb (diff)
downloadydb-deace08df3d1ae65cd93987981f35f5cbe40b9d1.tar.gz
Fix data race in case of handling clinet timeout during streaming call. Fix possible use after free in case of enabled grpc debug log and streaming call. KIKIMR-18801
Diffstat (limited to 'library')
-rw-r--r--library/cpp/grpc/server/grpc_request.h22
1 files changed, 12 insertions, 10 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index d9871f801d..caf47e2677 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -201,13 +201,11 @@ public:
}
void Reply(NProtoBuf::Message* resp, ui32 status) override {
- ResponseStatus = status;
- WriteDataOk(resp);
+ WriteDataOk(resp, status);
}
void Reply(grpc::ByteBuffer* resp, ui32 status) override {
- ResponseStatus = status;
- WriteByteDataOk(resp);
+ WriteByteDataOk(resp, status);
}
void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details) override {
@@ -276,7 +274,7 @@ private:
Y_VERIFY(wasInProgress, "Finished grpc call that was not in progress");
}
- void WriteDataOk(NProtoBuf::Message* resp) {
+ void WriteDataOk(NProtoBuf::Message* resp, ui32 status) {
auto makeResponseString = [&] {
TString x;
TOutProtoPrinter printer;
@@ -291,6 +289,7 @@ private:
makeResponseString().data(), this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
+ ResponseStatus = status;
Y_VERIFY(this->Context.c_call());
OnBeforeCall();
Finished_ = true;
@@ -302,11 +301,12 @@ private:
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid message copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",
- this, Name_, makeResponseString().data(), this->Context.peer().c_str());
+ auto cb = [this, uResp = std::move(uResp), sz, status]() {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s peer# %s (pushed to grpc)",
+ this, Name_, this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
+ ResponseStatus = status;
OnBeforeCall();
StreamWriter_->Write(*uResp, GetGRpcTag());
};
@@ -314,13 +314,14 @@ private:
}
}
- void WriteByteDataOk(grpc::ByteBuffer* resp) {
+ void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status) {
auto sz = resp->Length();
if (Writer_) {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
+ ResponseStatus = status;
OnBeforeCall();
Finished_ = true;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
@@ -331,11 +332,12 @@ private:
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid buffer copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz]() {
+ auto cb = [this, uResp = std::move(uResp), sz, status]() {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
this, Name_, this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
+ ResponseStatus = status;
OnBeforeCall();
StreamWriter_->Write(*uResp, GetGRpcTag());
};