diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-01-25 19:48:11 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-01-25 19:48:11 +0300 |
commit | d0c58909d8d578d279ac18aabca3f97763121f77 (patch) | |
tree | 84f82a8f0a872fe1d6cba3d750f0f1a4a7a1b7ce | |
parent | d796d4eb742557a71d57046ca7fbaedec3ce60a9 (diff) | |
download | ydb-d0c58909d8d578d279ac18aabca3f97763121f77.tar.gz |
Do not start processing reques in case of client disconnect or client timeout.
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 6 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 3 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 14 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_check_actor.h | 13 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.cpp | 8 | ||||
-rw-r--r-- | ydb/core/grpc_services/local_rpc/local_rpc.h | 2 | ||||
-rw-r--r-- | ydb/core/public_http/grpc_request_context_wrapper.h | 1 |
7 files changed, 44 insertions, 3 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index c4b7e9c040e..4e869ef5f6f 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -113,6 +113,10 @@ public: return FinishPromise_.GetFuture(); } + bool IsClientLost() const override { + return ClientLost_.load(); + } + TString GetPeer() const override { return TString(this->Context.peer()); } @@ -496,6 +500,7 @@ private: void OnFinish(EQueueEventStatus evStatus) { if (this->Context.IsCancelled()) { + ClientLost_.store(true); FinishPromise_.SetValue(EFinishStatus::CANCEL); } else { FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR); @@ -556,6 +561,7 @@ private: NThreading::TPromise<EFinishStatus> FinishPromise_; bool SkipUpdateCountersOnError = false; IStreamAdaptor::TPtr StreamAdaptor_; + std::atomic<bool> ClientLost_ = false; }; template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer> diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index 42b78ed7df1..60b38805ede 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -116,6 +116,9 @@ public: //! Returns true if server is using ssl virtual bool SslServer() const = 0; + + //! Returns true if client was not interested in result (but we still must send response to make grpc happy) + virtual bool IsClientLost() const = 0; }; } // namespace NGrpc diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 197368c7a0a..28b75d5fc08 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -252,6 +252,7 @@ public: virtual const TMaybe<TString> GetDatabaseName() const = 0; // Returns "internal" token (result of ticket parser authentication) virtual const TString& GetInternalToken() const = 0; + virtual bool IsClientLost() const = 0; }; class IRequestCtxBase : public virtual IRequestCtxBaseMtSafe { @@ -504,6 +505,10 @@ public: void SetRlPath(TMaybe<NRpcService::TRlPath>&&) override { } + bool IsClientLost() const override { + return false; + } + TMaybe<NRpcService::TRlPath> GetRlPath() const override { return Nothing(); } @@ -656,6 +661,11 @@ public: , RlAllowed_(rlAllowed) { } + bool IsClientLost() const override { + // TODO: Implement for BiDirectional streaming + return false; + } + TRateLimiterMode GetRlMode() const override { return RlAllowed_ ? RateLimitMode : TRateLimiterMode::Off; } @@ -1119,6 +1129,10 @@ public: Ctx_->GetFinishFuture().Subscribe(std::move(shutdown)); } + bool IsClientLost() const override { + return Ctx_->IsClientLost(); + } + void FinishStream() override { Ctx_->FinishStreamingOk(); } diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h index e1cd950744e..0cb310c7226 100644 --- a/ydb/core/grpc_services/grpc_request_check_actor.h +++ b/ydb/core/grpc_services/grpc_request_check_actor.h @@ -197,9 +197,16 @@ public: } void SetTokenAndDie() { - GrpcRequestBaseCtx_->UpdateAuthState(NGrpc::TAuthState::AS_OK); - GrpcRequestBaseCtx_->SetInternalToken(TBase::GetSerializedToken()); - ReplyBackAndDie(); + if (GrpcRequestBaseCtx_->IsClientLost()) { + LOG_DEBUG(*TlsActivationContext, NKikimrServices::GRPC_SERVER, + "Client was disconnected before processing request (check actor)"); + const NYql::TIssues issues; + ReplyUnavailableAndDie(issues); + } else { + GrpcRequestBaseCtx_->UpdateAuthState(NGrpc::TAuthState::AS_OK); + GrpcRequestBaseCtx_->SetInternalToken(TBase::GetSerializedToken()); + ReplyBackAndDie(); + } } STATEFN(DbAccessStateFunc) { diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 1188eb13d9d..778b424ab1c 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -236,6 +236,14 @@ private: return; } + if (requestBaseCtx->IsClientLost()) { + // Any status here + LOG_DEBUG(*TlsActivationContext, NKikimrServices::GRPC_SERVER, + "Client was disconnected before processing request (grpc request proxy)"); + requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE); + return; + } + Register(CreateGrpcRequestCheckActor<TEvent>(SelfId(), database->SchemeBoardResult->DescribeSchemeResult, database->SecurityObject, event.Release(), Counters, skipCheckConnectRigths)); diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index e6a87677c6f..7be82d6a5c0 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -148,6 +148,8 @@ public: void SetClientLostAction(std::function<void()>&&) override {} + bool IsClientLost() const override { return false; } + void AddServerHint(const TString&) override {} void SetRuHeader(ui64) override {} diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h index 9dc718e633a..964e33ef3a2 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.h +++ b/ydb/core/public_http/grpc_request_context_wrapper.h @@ -46,6 +46,7 @@ public: virtual void SetNextReplyCallback(TOnNextReply&&) {} virtual void FinishStreamingOk() {} virtual TAsyncFinishResult GetFinishFuture() { return {}; } + virtual bool IsClientLost() const { return false; } virtual TString GetPeer() const { return {}; } virtual bool SslServer() const { return false; } }; |