diff options
author | robdrynkin <robdrynkin@yandex-team.com> | 2023-06-15 10:00:02 +0300 |
---|---|---|
committer | robdrynkin <robdrynkin@yandex-team.com> | 2023-06-15 10:00:02 +0300 |
commit | 0e34603dddbc6fb0ef7d992aa729fb19bcd1d047 (patch) | |
tree | 054dd99212b2dbd94a01508285797f0a8e4ee9b0 | |
parent | 497eba2b257fe1d27eebbc6c5ada5652a58b8436 (diff) | |
download | ydb-0e34603dddbc6fb0ef7d992aa729fb19bcd1d047.tar.gz |
Query service handle client lost
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 4 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_query_service_ut.cpp | 39 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 5 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.h | 1 |
6 files changed, 59 insertions, 2 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index ed69142882..9ba7d29659 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -244,4 +244,8 @@ TString TGRpcServer::GetHost() const { return Options_.Host; } +const TVector<TGRpcServer::IGRpcServicePtr>& TGRpcServer::GetServices() const { + return Services_; +} + } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index a571509caa..fc1826b922 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -387,6 +387,8 @@ public: ui16 GetPort() const; TString GetHost() const; + const TVector<IGRpcServicePtr>& GetServices() const; + private: using IThreadRef = TAutoPtr<IThreadFactory::IThread>; diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 149e6061b8..48fed86dd1 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -389,8 +389,14 @@ private: private: void HandleClientLost(const TActorContext& ctx) { - // TODO: Abort query execution. - Y_UNUSED(ctx); + LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost"); + + // We must try to finish stream otherwise grpc will not free allocated memory + // If stream already scheduled to be finished (ReplyFinishStream already called) + // this call do nothing but Die will be called after reply to grpc + auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + "Client should not see this message, if so... may the force be with you"); + ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue); } void ReplyFinishStream(Ydb::StatusIds::StatusCode status, const NYql::TIssue& issue) { diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index 86b597ba16..b66916f3db 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -1,3 +1,4 @@ +#include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> @@ -360,6 +361,44 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto cancelStatus = opClient.Cancel(scriptExecutionOperation.Id()).ExtractValueSync(); UNIT_ASSERT_C(cancelStatus.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED, cancelStatus.GetIssues().ToString()); } + + Y_UNIT_TEST(CloseConnection) { + auto kikimr = DefaultKikimrRunner(); + + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + int maxTimeoutMs = 100; + + for (int i = 1; i < maxTimeoutMs; i++) { + auto it = kikimr.GetQueryClient().StreamExecuteQuery(R"( + SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key; + )", TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(i))).GetValueSync(); + + if (it.IsSuccess()) { + try { + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + break; + } + } + } catch (const TStreamReadError& ex) { + UNIT_ASSERT_VALUES_EQUAL(ex.Status, NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED); + } catch (const std::exception& ex) { + UNIT_ASSERT_C(false, "unknown exception during the test"); + } + } else { + UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED); + } + } + + WaitForZeroSessions(counters); + + for (const auto& service: kikimr.GetTestServer().GetGRpcServer().GetServices()) { + UNIT_ASSERT_VALUES_EQUAL(service->RequestsInProgress(), 0); + UNIT_ASSERT(!service->IsUnsafeToShutdown()); + } + } } } // namespace NKqp diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 741b9c297e..cb5c8dfa0c 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1074,6 +1074,11 @@ namespace Tests { return *Driver; } + const NGrpc::TGRpcServer& TServer::GetGRpcServer() const { + Y_VERIFY(GRpcServer); + return *GRpcServer; + } + TServer::~TServer() { if (GRpcServer) { GRpcServer->Stop(); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 5cb51220fa..6983bdfd6a 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -274,6 +274,7 @@ namespace Tests { const NScheme::TTypeRegistry* GetTypeRegistry(); const NMiniKQL::IFunctionRegistry* GetFunctionRegistry(); const NYdb::TDriver& GetDriver() const; + const NGrpc::TGRpcServer& GetGRpcServer() const; ui32 StaticNodes() const { return Settings->NodeCount; |