aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobdrynkin <robdrynkin@yandex-team.com>2023-06-15 10:00:02 +0300
committerrobdrynkin <robdrynkin@yandex-team.com>2023-06-15 10:00:02 +0300
commit0e34603dddbc6fb0ef7d992aa729fb19bcd1d047 (patch)
tree054dd99212b2dbd94a01508285797f0a8e4ee9b0
parent497eba2b257fe1d27eebbc6c5ada5652a58b8436 (diff)
downloadydb-0e34603dddbc6fb0ef7d992aa729fb19bcd1d047.tar.gz
Query service handle client lost
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp4
-rw-r--r--library/cpp/grpc/server/grpc_server.h2
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp10
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp39
-rw-r--r--ydb/core/testlib/test_client.cpp5
-rw-r--r--ydb/core/testlib/test_client.h1
6 files changed, 59 insertions, 2 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index ed69142882..9ba7d29659 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -244,4 +244,8 @@ TString TGRpcServer::GetHost() const {
return Options_.Host;
}
+const TVector<TGRpcServer::IGRpcServicePtr>& TGRpcServer::GetServices() const {
+ return Services_;
+}
+
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index a571509caa..fc1826b922 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -387,6 +387,8 @@ public:
ui16 GetPort() const;
TString GetHost() const;
+ const TVector<IGRpcServicePtr>& GetServices() const;
+
private:
using IThreadRef = TAutoPtr<IThreadFactory::IThread>;
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 149e6061b8..48fed86dd1 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -389,8 +389,14 @@ private:
private:
void HandleClientLost(const TActorContext& ctx) {
- // TODO: Abort query execution.
- Y_UNUSED(ctx);
+ LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost");
+
+ // We must try to finish stream otherwise grpc will not free allocated memory
+ // If stream already scheduled to be finished (ReplyFinishStream already called)
+ // this call do nothing but Die will be called after reply to grpc
+ auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
+ "Client should not see this message, if so... may the force be with you");
+ ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue);
}
void ReplyFinishStream(Ydb::StatusIds::StatusCode status, const NYql::TIssue& issue) {
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index 86b597ba16..b66916f3db 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -1,3 +1,4 @@
+#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
@@ -360,6 +361,44 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto cancelStatus = opClient.Cancel(scriptExecutionOperation.Id()).ExtractValueSync();
UNIT_ASSERT_C(cancelStatus.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED, cancelStatus.GetIssues().ToString());
}
+
+ Y_UNIT_TEST(CloseConnection) {
+ auto kikimr = DefaultKikimrRunner();
+
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ int maxTimeoutMs = 100;
+
+ for (int i = 1; i < maxTimeoutMs; i++) {
+ auto it = kikimr.GetQueryClient().StreamExecuteQuery(R"(
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(i))).GetValueSync();
+
+ if (it.IsSuccess()) {
+ try {
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ break;
+ }
+ }
+ } catch (const TStreamReadError& ex) {
+ UNIT_ASSERT_VALUES_EQUAL(ex.Status, NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED);
+ } catch (const std::exception& ex) {
+ UNIT_ASSERT_C(false, "unknown exception during the test");
+ }
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED);
+ }
+ }
+
+ WaitForZeroSessions(counters);
+
+ for (const auto& service: kikimr.GetTestServer().GetGRpcServer().GetServices()) {
+ UNIT_ASSERT_VALUES_EQUAL(service->RequestsInProgress(), 0);
+ UNIT_ASSERT(!service->IsUnsafeToShutdown());
+ }
+ }
}
} // namespace NKqp
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 741b9c297e..cb5c8dfa0c 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -1074,6 +1074,11 @@ namespace Tests {
return *Driver;
}
+ const NGrpc::TGRpcServer& TServer::GetGRpcServer() const {
+ Y_VERIFY(GRpcServer);
+ return *GRpcServer;
+ }
+
TServer::~TServer() {
if (GRpcServer) {
GRpcServer->Stop();
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index 5cb51220fa..6983bdfd6a 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -274,6 +274,7 @@ namespace Tests {
const NScheme::TTypeRegistry* GetTypeRegistry();
const NMiniKQL::IFunctionRegistry* GetFunctionRegistry();
const NYdb::TDriver& GetDriver() const;
+ const NGrpc::TGRpcServer& GetGRpcServer() const;
ui32 StaticNodes() const {
return Settings->NodeCount;