aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-01-25 19:48:11 +0300
committerdcherednik <dcherednik@ydb.tech>2023-01-25 19:48:11 +0300
commitd0c58909d8d578d279ac18aabca3f97763121f77 (patch)
tree84f82a8f0a872fe1d6cba3d750f0f1a4a7a1b7ce
parentd796d4eb742557a71d57046ca7fbaedec3ce60a9 (diff)
downloadydb-d0c58909d8d578d279ac18aabca3f97763121f77.tar.gz
Do not start processing reques in case of client disconnect or client timeout.
-rw-r--r--library/cpp/grpc/server/grpc_request.h6
-rw-r--r--library/cpp/grpc/server/grpc_request_base.h3
-rw-r--r--ydb/core/grpc_services/base/base.h14
-rw-r--r--ydb/core/grpc_services/grpc_request_check_actor.h13
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp8
-rw-r--r--ydb/core/grpc_services/local_rpc/local_rpc.h2
-rw-r--r--ydb/core/public_http/grpc_request_context_wrapper.h1
7 files changed, 44 insertions, 3 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index c4b7e9c040e..4e869ef5f6f 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -113,6 +113,10 @@ public:
return FinishPromise_.GetFuture();
}
+ bool IsClientLost() const override {
+ return ClientLost_.load();
+ }
+
TString GetPeer() const override {
return TString(this->Context.peer());
}
@@ -496,6 +500,7 @@ private:
void OnFinish(EQueueEventStatus evStatus) {
if (this->Context.IsCancelled()) {
+ ClientLost_.store(true);
FinishPromise_.SetValue(EFinishStatus::CANCEL);
} else {
FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR);
@@ -556,6 +561,7 @@ private:
NThreading::TPromise<EFinishStatus> FinishPromise_;
bool SkipUpdateCountersOnError = false;
IStreamAdaptor::TPtr StreamAdaptor_;
+ std::atomic<bool> ClientLost_ = false;
};
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer>
diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h
index 42b78ed7df1..60b38805ede 100644
--- a/library/cpp/grpc/server/grpc_request_base.h
+++ b/library/cpp/grpc/server/grpc_request_base.h
@@ -116,6 +116,9 @@ public:
//! Returns true if server is using ssl
virtual bool SslServer() const = 0;
+
+ //! Returns true if client was not interested in result (but we still must send response to make grpc happy)
+ virtual bool IsClientLost() const = 0;
};
} // namespace NGrpc
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 197368c7a0a..28b75d5fc08 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -252,6 +252,7 @@ public:
virtual const TMaybe<TString> GetDatabaseName() const = 0;
// Returns "internal" token (result of ticket parser authentication)
virtual const TString& GetInternalToken() const = 0;
+ virtual bool IsClientLost() const = 0;
};
class IRequestCtxBase : public virtual IRequestCtxBaseMtSafe {
@@ -504,6 +505,10 @@ public:
void SetRlPath(TMaybe<NRpcService::TRlPath>&&) override {
}
+ bool IsClientLost() const override {
+ return false;
+ }
+
TMaybe<NRpcService::TRlPath> GetRlPath() const override {
return Nothing();
}
@@ -656,6 +661,11 @@ public:
, RlAllowed_(rlAllowed)
{ }
+ bool IsClientLost() const override {
+ // TODO: Implement for BiDirectional streaming
+ return false;
+ }
+
TRateLimiterMode GetRlMode() const override {
return RlAllowed_ ? RateLimitMode : TRateLimiterMode::Off;
}
@@ -1119,6 +1129,10 @@ public:
Ctx_->GetFinishFuture().Subscribe(std::move(shutdown));
}
+ bool IsClientLost() const override {
+ return Ctx_->IsClientLost();
+ }
+
void FinishStream() override {
Ctx_->FinishStreamingOk();
}
diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h
index e1cd950744e..0cb310c7226 100644
--- a/ydb/core/grpc_services/grpc_request_check_actor.h
+++ b/ydb/core/grpc_services/grpc_request_check_actor.h
@@ -197,9 +197,16 @@ public:
}
void SetTokenAndDie() {
- GrpcRequestBaseCtx_->UpdateAuthState(NGrpc::TAuthState::AS_OK);
- GrpcRequestBaseCtx_->SetInternalToken(TBase::GetSerializedToken());
- ReplyBackAndDie();
+ if (GrpcRequestBaseCtx_->IsClientLost()) {
+ LOG_DEBUG(*TlsActivationContext, NKikimrServices::GRPC_SERVER,
+ "Client was disconnected before processing request (check actor)");
+ const NYql::TIssues issues;
+ ReplyUnavailableAndDie(issues);
+ } else {
+ GrpcRequestBaseCtx_->UpdateAuthState(NGrpc::TAuthState::AS_OK);
+ GrpcRequestBaseCtx_->SetInternalToken(TBase::GetSerializedToken());
+ ReplyBackAndDie();
+ }
}
STATEFN(DbAccessStateFunc) {
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 1188eb13d9d..778b424ab1c 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -236,6 +236,14 @@ private:
return;
}
+ if (requestBaseCtx->IsClientLost()) {
+ // Any status here
+ LOG_DEBUG(*TlsActivationContext, NKikimrServices::GRPC_SERVER,
+ "Client was disconnected before processing request (grpc request proxy)");
+ requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
+ return;
+ }
+
Register(CreateGrpcRequestCheckActor<TEvent>(SelfId(),
database->SchemeBoardResult->DescribeSchemeResult,
database->SecurityObject, event.Release(), Counters, skipCheckConnectRigths));
diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h
index e6a87677c6f..7be82d6a5c0 100644
--- a/ydb/core/grpc_services/local_rpc/local_rpc.h
+++ b/ydb/core/grpc_services/local_rpc/local_rpc.h
@@ -148,6 +148,8 @@ public:
void SetClientLostAction(std::function<void()>&&) override {}
+ bool IsClientLost() const override { return false; }
+
void AddServerHint(const TString&) override {}
void SetRuHeader(ui64) override {}
diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h
index 9dc718e633a..964e33ef3a2 100644
--- a/ydb/core/public_http/grpc_request_context_wrapper.h
+++ b/ydb/core/public_http/grpc_request_context_wrapper.h
@@ -46,6 +46,7 @@ public:
virtual void SetNextReplyCallback(TOnNextReply&&) {}
virtual void FinishStreamingOk() {}
virtual TAsyncFinishResult GetFinishFuture() { return {}; }
+ virtual bool IsClientLost() const { return false; }
virtual TString GetPeer() const { return {}; }
virtual bool SslServer() const { return false; }
};