summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <[email protected]>2024-04-08 18:03:11 +0200
committerGitHub <[email protected]>2024-04-08 19:03:11 +0300
commit6041652a3a08d618ea7db65de51b56d3aa7c2370 (patch)
treed9ca3000b5bf484d6235fa94e6dc3c5db835b795
parent4a55e1b89fddff8db2cd5224e2498fb6d0881735 (diff)
The grpc stream must be finished in case of grpc proxy emmited YDB error (#3560)
Changelog entry #3489 Changelog category Bugfix
-rw-r--r--ydb/core/grpc_services/base/base.h20
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp6
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy_simple.cpp2
-rw-r--r--ydb/core/grpc_services/local_grpc/local_grpc.h1
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp4
-rw-r--r--ydb/core/public_http/grpc_request_context_wrapper.h1
-rw-r--r--ydb/library/grpc/server/grpc_request.h4
-rw-r--r--ydb/library/grpc/server/grpc_request_base.h2
8 files changed, 17 insertions, 23 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 66155cce6bd..510a55061b1 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -359,7 +359,6 @@ class IRequestProxyCtx
friend class TGRpcRequestProxySimple;
friend class TGRpcRequestProxyHandleMethods;
private:
- virtual void ReplyUnavaliable() = 0;
virtual void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) = 0;
public:
virtual ~IRequestProxyCtx() = default;
@@ -574,7 +573,7 @@ public:
}
void ReplyUnauthenticated(const TString&) override;
- void ReplyUnavaliable() override;
+ void ReplyUnavaliable();
void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
switch (status) {
case Ydb::StatusIds::UNAVAILABLE:
@@ -719,13 +718,6 @@ class TGRpcRequestBiStreamWrapper
, public TEventLocal<TGRpcRequestBiStreamWrapper<TRpcId, TReq, TResp, RlMode>, TRpcId>
{
private:
- void ReplyUnavaliable() override {
- Ctx_->Attach(TActorId());
- TResponse resp;
- FillYdbStatus(resp, IssueManager_.GetIssues(), Ydb::StatusIds::UNAVAILABLE);
- Ctx_->WriteAndFinish(std::move(resp), grpc::Status::OK);
- }
-
void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
Ctx_->Attach(TActorId());
TResponse resp;
@@ -1114,18 +1106,14 @@ public:
Ctx_->UseDatabase(database);
}
- void ReplyUnavaliable() override {
- TResponse* resp = CreateResponseMessage();
- TCommonResponseFiller<TResp, TDerived::IsOp>::Fill(*resp, IssueManager.GetIssues(), CostInfo, Ydb::StatusIds::UNAVAILABLE);
- FinishRequest();
- Reply(resp, Ydb::StatusIds::UNAVAILABLE);
- }
-
void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
TResponse* resp = CreateResponseMessage();
TCommonResponseFiller<TResponse, TDerived::IsOp>::Fill(*resp, IssueManager.GetIssues(), CostInfo, status);
FinishRequest();
Reply(resp, status);
+ if (Ctx_->IsStreamCall()) {
+ Ctx_->FinishStreamingOk();
+ }
}
TString GetPeerName() const override {
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 6f5675c0595..18c99a16d95 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -169,7 +169,7 @@ private:
const TString error = "Unable to resolve token";
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_AUTH_UNAVAILABLE, error);
requestBaseCtx->RaiseIssue(issue);
- requestBaseCtx->ReplyUnavaliable();
+ requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
requestBaseCtx->FinishSpan();
return;
}
@@ -213,7 +213,7 @@ private:
LOG_ERROR(ctx, NKikimrServices::GRPC_SERVER, "Limit for deferred events per database %s reached", databaseName.c_str());
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_DB_NOT_READY, error);
requestBaseCtx->RaiseIssue(issue);
- requestBaseCtx->ReplyUnavaliable();
+ requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
requestBaseCtx->FinishSpan();
return;
}
@@ -295,7 +295,7 @@ private:
virtual void PassAway() override {
for (auto& [_, queue] : DeferredEvents) {
for (TEventReqHolder& req : queue) {
- req.Ctx->ReplyUnavaliable();
+ req.Ctx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
req.Ctx->FinishSpan();
}
}
diff --git a/ydb/core/grpc_services/grpc_request_proxy_simple.cpp b/ydb/core/grpc_services/grpc_request_proxy_simple.cpp
index b78fb779086..3ad80fbbf54 100644
--- a/ydb/core/grpc_services/grpc_request_proxy_simple.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy_simple.cpp
@@ -121,7 +121,7 @@ private:
const TString error = "Unable to resolve token";
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_AUTH_UNAVAILABLE, error);
requestBaseCtx->RaiseIssue(issue);
- requestBaseCtx->ReplyUnavaliable();
+ requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
return;
}
diff --git a/ydb/core/grpc_services/local_grpc/local_grpc.h b/ydb/core/grpc_services/local_grpc/local_grpc.h
index 074c6fdbfc7..0e61dbf871d 100644
--- a/ydb/core/grpc_services/local_grpc/local_grpc.h
+++ b/ydb/core/grpc_services/local_grpc/local_grpc.h
@@ -76,6 +76,7 @@ public:
TString GetPeer() const override { return {}; }
bool SslServer() const override { return false; }
bool IsClientLost() const override { return false; }
+ bool IsStreamCall() const override { return false; }
public:
NYql::TIssues GetIssues() {
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index 8912fb1e8c1..fd3d8d31dfc 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -261,8 +261,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
const TString query = "SELECT * FROM AS_TABLE(ListReplicate(AsStruct(\"12345678\" AS Key), 100000))";
-//TODO: it looks like this check triggers grpc request proxy request leak
-/*
{
// Check range for chunk size settings
auto settings = TExecuteQuerySettings().OutputChunkMaxSize(48_MB);
@@ -271,7 +269,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto streamPart = it.ReadNext().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::BAD_REQUEST, streamPart.GetIssues().ToString());
}
-*/
+
auto settings = TExecuteQuerySettings().OutputChunkMaxSize(10000);
auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h
index e1bcf4d5f0f..e264582c3e9 100644
--- a/ydb/core/public_http/grpc_request_context_wrapper.h
+++ b/ydb/core/public_http/grpc_request_context_wrapper.h
@@ -48,6 +48,7 @@ public:
virtual bool IsClientLost() const { return false; }
virtual TString GetPeer() const { return {}; }
virtual bool SslServer() const { return false; }
+ virtual bool IsStreamCall() const { return false; }
};
} // namespace NKikimr::NPublicHttp
diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h
index c1f249f36db..8a4774fcc2f 100644
--- a/ydb/library/grpc/server/grpc_request.h
+++ b/ydb/library/grpc/server/grpc_request.h
@@ -118,6 +118,10 @@ public:
return ClientLost_.load();
}
+ bool IsStreamCall() const override {
+ return bool(StreamAdaptor_);
+ }
+
TString GetPeer() const override {
// Decode URL-encoded square brackets
auto ip = TString(this->Context.peer());
diff --git a/ydb/library/grpc/server/grpc_request_base.h b/ydb/library/grpc/server/grpc_request_base.h
index e03fb57c356..2ad89993c85 100644
--- a/ydb/library/grpc/server/grpc_request_base.h
+++ b/ydb/library/grpc/server/grpc_request_base.h
@@ -112,6 +112,8 @@ public:
//! reply in flight
virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0;
+ virtual bool IsStreamCall() const = 0;
+
//! Finish streaming reply
virtual void FinishStreamingOk() = 0;