summaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authordcherednik <[email protected]>2023-07-19 22:38:50 +0300
committerdcherednik <[email protected]>2023-07-19 22:38:50 +0300
commitdeace08df3d1ae65cd93987981f35f5cbe40b9d1 (patch)
tree57fa1313d62cf934752c6d6fa2f8bf84c1f6a7e9 /library/cpp
parenta557863565c89afa62a4b32caf33985f983bcddb (diff)
Fix data race in case of handling clinet timeout during streaming call. Fix possible use after free in case of enabled grpc debug log and streaming call. KIKIMR-18801
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/grpc/server/grpc_request.h22
1 files changed, 12 insertions, 10 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index d9871f801d7..caf47e26771 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -201,13 +201,11 @@ public:
}
void Reply(NProtoBuf::Message* resp, ui32 status) override {
- ResponseStatus = status;
- WriteDataOk(resp);
+ WriteDataOk(resp, status);
}
void Reply(grpc::ByteBuffer* resp, ui32 status) override {
- ResponseStatus = status;
- WriteByteDataOk(resp);
+ WriteByteDataOk(resp, status);
}
void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details) override {
@@ -276,7 +274,7 @@ private:
Y_VERIFY(wasInProgress, "Finished grpc call that was not in progress");
}
- void WriteDataOk(NProtoBuf::Message* resp) {
+ void WriteDataOk(NProtoBuf::Message* resp, ui32 status) {
auto makeResponseString = [&] {
TString x;
TOutProtoPrinter printer;
@@ -291,6 +289,7 @@ private:
makeResponseString().data(), this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
+ ResponseStatus = status;
Y_VERIFY(this->Context.c_call());
OnBeforeCall();
Finished_ = true;
@@ -302,11 +301,12 @@ private:
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid message copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",
- this, Name_, makeResponseString().data(), this->Context.peer().c_str());
+ auto cb = [this, uResp = std::move(uResp), sz, status]() {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s peer# %s (pushed to grpc)",
+ this, Name_, this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
+ ResponseStatus = status;
OnBeforeCall();
StreamWriter_->Write(*uResp, GetGRpcTag());
};
@@ -314,13 +314,14 @@ private:
}
}
- void WriteByteDataOk(grpc::ByteBuffer* resp) {
+ void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status) {
auto sz = resp->Length();
if (Writer_) {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
+ ResponseStatus = status;
OnBeforeCall();
Finished_ = true;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
@@ -331,11 +332,12 @@ private:
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid buffer copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz]() {
+ auto cb = [this, uResp = std::move(uResp), sz, status]() {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
this, Name_, this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
+ ResponseStatus = status;
OnBeforeCall();
StreamWriter_->Write(*uResp, GetGRpcTag());
};