diff options
author | snaury <snaury@ydb.tech> | 2022-08-08 14:13:59 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-08-08 14:13:59 +0300 |
commit | cd5f1d506209511017e977c7a7b53c4779d5adb1 (patch) | |
tree | fdebe754615fa34d512824320a13a21a8396a7e3 | |
parent | 34d70b4d5019c91fed33d3aab2cdda46b61dd734 (diff) | |
download | ydb-cd5f1d506209511017e977c7a7b53c4779d5adb1.tar.gz |
Catch incorrect grpc request flow and prevent double free,
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 39 | ||||
-rw-r--r-- | ydb/core/client/server/grpc_server.cpp | 28 |
2 files changed, 67 insertions, 0 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index daace3e5e9..b79a0d94fa 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -126,6 +126,7 @@ public: if (auto guard = Server_->ProtectShutdown()) { Ref(); //For grpc c runtime this->Context.AsyncNotifyWhenDone(OnFinishTag.Prepare()); + OnBeforeCall(); if (RequestCallback_) { (this->Service->*RequestCallback_) (&this->Context, Request_, @@ -148,6 +149,8 @@ public: } void DestroyRequest() override { + Y_VERIFY(!CallInProgress_, "Unexpected DestroyRequest while another grpc call is still in progress"); + RequestDestroyed_ = true; if (RequestRegistered_) { Server_->DeregisterRequestCtx(this); RequestRegistered_ = false; @@ -215,6 +218,8 @@ public: GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_, this->Context.peer().c_str()); + OnBeforeCall(); + Finished_ = true; StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag()); }; StreamAdaptor_->Enqueue(std::move(cb), false); @@ -241,6 +246,19 @@ private: } } + void OnBeforeCall() { + Y_VERIFY(!RequestDestroyed_, "Cannot start grpc calls after request is already destroyed"); + Y_VERIFY(!Finished_, "Cannot start grpc calls after request is finished"); + bool wasInProgress = std::exchange(CallInProgress_, true); + Y_VERIFY(!wasInProgress, "Another grpc call is already in progress"); + } + + void OnAfterCall() { + Y_VERIFY(!RequestDestroyed_, "Finished grpc call after request is already destroyed"); + bool wasInProgress = std::exchange(CallInProgress_, false); + Y_VERIFY(wasInProgress, "Finished grpc call that was not in progress"); + } + void WriteDataOk(NProtoBuf::Message* resp) { auto makeResponseString = [&] { TString x; @@ -257,6 +275,8 @@ private: StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; Y_VERIFY(this->Context.c_call()); + OnBeforeCall(); + Finished_ = true; Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)", @@ -270,6 +290,7 @@ private: this, Name_, makeResponseString().data(), this->Context.peer().c_str()); StateFunc_ = &TThis::NextReply; ResponseSize += sz; + OnBeforeCall(); StreamWriter_->Write(*uResp, GetGRpcTag()); }; StreamAdaptor_->Enqueue(std::move(cb), false); @@ -283,6 +304,8 @@ private: this->Context.peer().c_str()); StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; + OnBeforeCall(); + Finished_ = true; Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_, @@ -296,6 +319,7 @@ private: this, Name_, this->Context.peer().c_str()); StateFunc_ = &TThis::NextReply; ResponseSize += sz; + OnBeforeCall(); StreamWriter_->Write(*uResp, GetGRpcTag()); }; StreamAdaptor_->Enqueue(std::move(cb), false); @@ -315,6 +339,8 @@ private: Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); StateFunc_ = &TThis::SetFinishError; TOut resp; + OnBeforeCall(); + Finished_ = true; Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg, details), GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" @@ -324,6 +350,8 @@ private: " (pushed to grpc)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); StateFunc_ = &TThis::SetFinishError; + OnBeforeCall(); + Finished_ = true; StreamWriter_->Finish(grpc::Status(code, msg, details), GetGRpcTag()); }; StreamAdaptor_->Enqueue(std::move(cb), urgent); @@ -331,6 +359,8 @@ private: } bool SetRequestDone(bool ok) { + OnAfterCall(); + auto makeRequestString = [&] { TString resp; if (ok) { @@ -398,6 +428,8 @@ private: } bool NextReply(bool ok) { + OnAfterCall(); + auto logCb = [this, ok](int left) { GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_, ok ? "true" : "false", this->Context.peer().c_str(), left); @@ -423,6 +455,8 @@ private: } bool SetFinishDone(bool ok) { + OnAfterCall(); + GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); //PrintBackTrace(); @@ -433,6 +467,8 @@ private: } bool SetFinishError(bool ok) { + OnAfterCall(); + GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); if (!SkipUpdateCountersOnError) { @@ -502,6 +538,9 @@ private: THPTimer RequestTimer; TAuthState AuthState_ = 0; bool RequestRegistered_ = false; + bool RequestDestroyed_ = false; + bool CallInProgress_ = false; + bool Finished_ = false; using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; diff --git a/ydb/core/client/server/grpc_server.cpp b/ydb/core/client/server/grpc_server.cpp index 86f37bfcbb..019ad3d129 100644 --- a/ydb/core/client/server/grpc_server.cpp +++ b/ydb/core/client/server/grpc_server.cpp @@ -94,6 +94,7 @@ public: void Start() { if (auto guard = Server->ProtectShutdown()) { + OnBeforeCall(); (Service->*RequestCallback)(&Context, &Request, Writer.Get(), CQ, CQ, GetGRpcTag()); } else { // Server is shutting down, new requests cannot be started @@ -115,6 +116,8 @@ public: } void DestroyRequest() override { + Y_VERIFY(!CallInProgress_, "Unexpected DestroyRequest while another grpc call is still in progress"); + RequestDestroyed_ = true; if (RequestRegistered_) { Server->DeregisterRequestCtx(this); RequestRegistered_ = false; @@ -122,6 +125,20 @@ public: delete this; } +private: + void OnBeforeCall() { + Y_VERIFY(!RequestDestroyed_, "Cannot start grpc calls after request is already destroyed"); + Y_VERIFY(!Finished_, "Cannot start grpc calls after request is finished"); + bool wasInProgress = std::exchange(CallInProgress_, true); + Y_VERIFY(!wasInProgress, "Another grpc call is already in progress"); + } + + void OnAfterCall() { + Y_VERIFY(!RequestDestroyed_, "Finished grpc call after request is already destroyed"); + bool wasInProgress = std::exchange(CallInProgress_, false); + Y_VERIFY(wasInProgress, "Finished grpc call that was not in progress"); + } + public: //! Get pointer to the request's message. const NProtoBuf::Message* GetRequest() const override { @@ -270,6 +287,8 @@ private: ResponseSize = resp.ByteSize(); ResponseStatus = status; StateFunc = &TSimpleRequest::FinishDone; + OnBeforeCall(); + Finished_ = true; Writer->Finish(resp, Status::OK, GetGRpcTag()); } @@ -280,12 +299,16 @@ private: Name, Context.peer().c_str()); StateFunc = &TSimpleRequest::FinishDoneWithoutProcessing; + OnBeforeCall(); + Finished_ = true; Writer->Finish(resp, grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, msg), GetGRpcTag()); } bool RequestDone(bool ok) { + OnAfterCall(); + auto makeRequestString = [&] { TString resp; if (ok) { @@ -334,6 +357,7 @@ private: } bool FinishDone(bool ok) { + OnAfterCall(); LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request Name# %s ok# %s peer# %s", this, Name, ok ? "true" : "false", Context.peer().c_str()); Counters->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, @@ -345,6 +369,7 @@ private: } bool FinishDoneWithoutProcessing(bool ok) { + OnAfterCall(); LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request without processing Name# %s ok# %s peer# %s", this, Name, ok ? "true" : "false", Context.peer().c_str()); @@ -373,6 +398,9 @@ private: TMaybe<NMsgBusProxy::TBusMessageContext> BusContext; bool InProgress_; bool RequestRegistered_ = false; + bool RequestDestroyed_ = false; + bool CallInProgress_ = false; + bool Finished_ = false; }; } // namespace |