aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-08-08 14:13:59 +0300
committersnaury <snaury@ydb.tech>2022-08-08 14:13:59 +0300
commitcd5f1d506209511017e977c7a7b53c4779d5adb1 (patch)
treefdebe754615fa34d512824320a13a21a8396a7e3
parent34d70b4d5019c91fed33d3aab2cdda46b61dd734 (diff)
downloadydb-cd5f1d506209511017e977c7a7b53c4779d5adb1.tar.gz
Catch incorrect grpc request flow and prevent double free,
-rw-r--r--library/cpp/grpc/server/grpc_request.h39
-rw-r--r--ydb/core/client/server/grpc_server.cpp28
2 files changed, 67 insertions, 0 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index daace3e5e9..b79a0d94fa 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -126,6 +126,7 @@ public:
if (auto guard = Server_->ProtectShutdown()) {
Ref(); //For grpc c runtime
this->Context.AsyncNotifyWhenDone(OnFinishTag.Prepare());
+ OnBeforeCall();
if (RequestCallback_) {
(this->Service->*RequestCallback_)
(&this->Context, Request_,
@@ -148,6 +149,8 @@ public:
}
void DestroyRequest() override {
+ Y_VERIFY(!CallInProgress_, "Unexpected DestroyRequest while another grpc call is still in progress");
+ RequestDestroyed_ = true;
if (RequestRegistered_) {
Server_->DeregisterRequestCtx(this);
RequestRegistered_ = false;
@@ -215,6 +218,8 @@ public:
GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_,
this->Context.peer().c_str());
+ OnBeforeCall();
+ Finished_ = true;
StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag());
};
StreamAdaptor_->Enqueue(std::move(cb), false);
@@ -241,6 +246,19 @@ private:
}
}
+ void OnBeforeCall() {
+ Y_VERIFY(!RequestDestroyed_, "Cannot start grpc calls after request is already destroyed");
+ Y_VERIFY(!Finished_, "Cannot start grpc calls after request is finished");
+ bool wasInProgress = std::exchange(CallInProgress_, true);
+ Y_VERIFY(!wasInProgress, "Another grpc call is already in progress");
+ }
+
+ void OnAfterCall() {
+ Y_VERIFY(!RequestDestroyed_, "Finished grpc call after request is already destroyed");
+ bool wasInProgress = std::exchange(CallInProgress_, false);
+ Y_VERIFY(wasInProgress, "Finished grpc call that was not in progress");
+ }
+
void WriteDataOk(NProtoBuf::Message* resp) {
auto makeResponseString = [&] {
TString x;
@@ -257,6 +275,8 @@ private:
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
Y_VERIFY(this->Context.c_call());
+ OnBeforeCall();
+ Finished_ = true;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
} else {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",
@@ -270,6 +290,7 @@ private:
this, Name_, makeResponseString().data(), this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
+ OnBeforeCall();
StreamWriter_->Write(*uResp, GetGRpcTag());
};
StreamAdaptor_->Enqueue(std::move(cb), false);
@@ -283,6 +304,8 @@ private:
this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
+ OnBeforeCall();
+ Finished_ = true;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
} else {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,
@@ -296,6 +319,7 @@ private:
this, Name_, this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
+ OnBeforeCall();
StreamWriter_->Write(*uResp, GetGRpcTag());
};
StreamAdaptor_->Enqueue(std::move(cb), false);
@@ -315,6 +339,8 @@ private:
Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
StateFunc_ = &TThis::SetFinishError;
TOut resp;
+ OnBeforeCall();
+ Finished_ = true;
Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg, details), GetGRpcTag());
} else {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
@@ -324,6 +350,8 @@ private:
" (pushed to grpc)", this, Name_, msg.c_str(),
this->Context.peer().c_str(), (int)code);
StateFunc_ = &TThis::SetFinishError;
+ OnBeforeCall();
+ Finished_ = true;
StreamWriter_->Finish(grpc::Status(code, msg, details), GetGRpcTag());
};
StreamAdaptor_->Enqueue(std::move(cb), urgent);
@@ -331,6 +359,8 @@ private:
}
bool SetRequestDone(bool ok) {
+ OnAfterCall();
+
auto makeRequestString = [&] {
TString resp;
if (ok) {
@@ -398,6 +428,8 @@ private:
}
bool NextReply(bool ok) {
+ OnAfterCall();
+
auto logCb = [this, ok](int left) {
GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_,
ok ? "true" : "false", this->Context.peer().c_str(), left);
@@ -423,6 +455,8 @@ private:
}
bool SetFinishDone(bool ok) {
+ OnAfterCall();
+
GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_,
ok ? "true" : "false", this->Context.peer().c_str());
//PrintBackTrace();
@@ -433,6 +467,8 @@ private:
}
bool SetFinishError(bool ok) {
+ OnAfterCall();
+
GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_,
ok ? "true" : "false", this->Context.peer().c_str());
if (!SkipUpdateCountersOnError) {
@@ -502,6 +538,9 @@ private:
THPTimer RequestTimer;
TAuthState AuthState_ = 0;
bool RequestRegistered_ = false;
+ bool RequestDestroyed_ = false;
+ bool CallInProgress_ = false;
+ bool Finished_ = false;
using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;
TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish };
diff --git a/ydb/core/client/server/grpc_server.cpp b/ydb/core/client/server/grpc_server.cpp
index 86f37bfcbb..019ad3d129 100644
--- a/ydb/core/client/server/grpc_server.cpp
+++ b/ydb/core/client/server/grpc_server.cpp
@@ -94,6 +94,7 @@ public:
void Start() {
if (auto guard = Server->ProtectShutdown()) {
+ OnBeforeCall();
(Service->*RequestCallback)(&Context, &Request, Writer.Get(), CQ, CQ, GetGRpcTag());
} else {
// Server is shutting down, new requests cannot be started
@@ -115,6 +116,8 @@ public:
}
void DestroyRequest() override {
+ Y_VERIFY(!CallInProgress_, "Unexpected DestroyRequest while another grpc call is still in progress");
+ RequestDestroyed_ = true;
if (RequestRegistered_) {
Server->DeregisterRequestCtx(this);
RequestRegistered_ = false;
@@ -122,6 +125,20 @@ public:
delete this;
}
+private:
+ void OnBeforeCall() {
+ Y_VERIFY(!RequestDestroyed_, "Cannot start grpc calls after request is already destroyed");
+ Y_VERIFY(!Finished_, "Cannot start grpc calls after request is finished");
+ bool wasInProgress = std::exchange(CallInProgress_, true);
+ Y_VERIFY(!wasInProgress, "Another grpc call is already in progress");
+ }
+
+ void OnAfterCall() {
+ Y_VERIFY(!RequestDestroyed_, "Finished grpc call after request is already destroyed");
+ bool wasInProgress = std::exchange(CallInProgress_, false);
+ Y_VERIFY(wasInProgress, "Finished grpc call that was not in progress");
+ }
+
public:
//! Get pointer to the request's message.
const NProtoBuf::Message* GetRequest() const override {
@@ -270,6 +287,8 @@ private:
ResponseSize = resp.ByteSize();
ResponseStatus = status;
StateFunc = &TSimpleRequest::FinishDone;
+ OnBeforeCall();
+ Finished_ = true;
Writer->Finish(resp, Status::OK, GetGRpcTag());
}
@@ -280,12 +299,16 @@ private:
Name, Context.peer().c_str());
StateFunc = &TSimpleRequest::FinishDoneWithoutProcessing;
+ OnBeforeCall();
+ Finished_ = true;
Writer->Finish(resp,
grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, msg),
GetGRpcTag());
}
bool RequestDone(bool ok) {
+ OnAfterCall();
+
auto makeRequestString = [&] {
TString resp;
if (ok) {
@@ -334,6 +357,7 @@ private:
}
bool FinishDone(bool ok) {
+ OnAfterCall();
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request Name# %s ok# %s peer# %s", this,
Name, ok ? "true" : "false", Context.peer().c_str());
Counters->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
@@ -345,6 +369,7 @@ private:
}
bool FinishDoneWithoutProcessing(bool ok) {
+ OnAfterCall();
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request without processing Name# %s ok# %s peer# %s", this,
Name, ok ? "true" : "false", Context.peer().c_str());
@@ -373,6 +398,9 @@ private:
TMaybe<NMsgBusProxy::TBusMessageContext> BusContext;
bool InProgress_;
bool RequestRegistered_ = false;
+ bool RequestDestroyed_ = false;
+ bool CallInProgress_ = false;
+ bool Finished_ = false;
};
} // namespace