summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <[email protected]>2023-07-19 13:45:44 +0300
committerdcherednik <[email protected]>2023-07-19 13:45:44 +0300
commitb03dce345d49158a866df832fc0e4cd257893146 (patch)
treed5ec30e2b78ff1cef7cc9c3fd50c56791f3c6b62
parente3dee9bc4c69b09540ef88df8b4d8f988d8b58b2 (diff)
Check the kqp session is destroyed during intermediate node stuck. KIKIMR-18250
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h13
-rw-r--r--ydb/services/ydb/ut_helpers/ut_helpers_query.cpp17
-rw-r--r--ydb/services/ydb/ut_helpers/ut_helpers_query.h7
-rw-r--r--ydb/tests/library/harness/resources/default_yaml.yml1
4 files changed, 25 insertions, 13 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index bf0f8e0f9f2..037463cb78c 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -387,16 +387,21 @@ private:
bool Replied_ = false;
};
-template<class TResponse>
-class IStreamRequestReadProcessor : public TThrRefBase {
+class IStreamRequestCtrl : public TThrRefBase {
public:
- using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;
- using TReadCallback = std::function<void(TGrpcStatus&&)>;
+ using TPtr = TIntrusivePtr<IStreamRequestCtrl>;
/**
* Asynchronously cancel the request
*/
virtual void Cancel() = 0;
+};
+
+template<class TResponse>
+class IStreamRequestReadProcessor : public IStreamRequestCtrl {
+public:
+ using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;
+ using TReadCallback = std::function<void(TGrpcStatus&&)>;
/**
* Scheduled initial server metadata read from the stream
diff --git a/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp b/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp
index 0d67120823d..200344fc29c 100644
--- a/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp
+++ b/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp
@@ -34,17 +34,16 @@ TString CreateQuerySession(const TGRpcClientConfig& clientConfig) {
return sessionId;
}
-using TProcessor = typename NGrpc::IStreamRequestReadProcessor<Ydb::Query::SessionState>::TPtr;
-void CheckAttach(const TGRpcClientConfig& clientConfig, const TString& id,
- int code, bool& allDoneOk)
+NGrpc::IStreamRequestCtrl::TPtr CheckAttach(NGrpc::TGRpcClientLow& clientLow, const TGRpcClientConfig& clientConfig,
+ const TString& id, int code, bool& allDoneOk)
{
const Ydb::StatusIds::StatusCode expected = static_cast<Ydb::StatusIds::StatusCode>(code);
- NGrpc::TGRpcClientLow clientLow;
auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig);
Ydb::Query::AttachSessionRequest request;
request.set_session_id(id);
+ using TProcessor = typename NGrpc::IStreamRequestReadProcessor<Ydb::Query::SessionState>::TPtr;
auto promise = NThreading::NewPromise<TProcessor>();
auto cb = [&allDoneOk, promise, expected](TGrpcStatus grpcStatus, TProcessor processor) mutable {
UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK);
@@ -53,7 +52,7 @@ void CheckAttach(const TGRpcClientConfig& clientConfig, const TString& id,
UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK);
allDoneOk &= (resp->status() == expected);
if (!allDoneOk) {
- Cerr << "Got attach response: " << resp->DebugString() << Endl;
+ Cerr << "Expected status: " << expected << ", got response: " << resp->DebugString() << Endl;
}
promise.SetValue(processor);
});
@@ -64,8 +63,12 @@ void CheckAttach(const TGRpcClientConfig& clientConfig, const TString& id,
cb,
&Ydb::Query::V1::QueryService::Stub::AsyncAttachSession);
- auto provider = promise.GetFuture().GetValueSync();
- provider->Cancel();
+ return promise.GetFuture().GetValueSync();
+}
+
+void CheckAttach(const TGRpcClientConfig& clientConfig, const TString& id, int code, bool& allDoneOk) {
+ NGrpc::TGRpcClientLow clientLow;
+ CheckAttach(clientLow, clientConfig, id, code, allDoneOk)->Cancel();
}
}
diff --git a/ydb/services/ydb/ut_helpers/ut_helpers_query.h b/ydb/services/ydb/ut_helpers/ut_helpers_query.h
index bdeef075eab..d6474002215 100644
--- a/ydb/services/ydb/ut_helpers/ut_helpers_query.h
+++ b/ydb/services/ydb/ut_helpers/ut_helpers_query.h
@@ -6,7 +6,10 @@
namespace NTestHelpers {
TString CreateQuerySession(const NGrpc::TGRpcClientConfig& clientConfig);
-void CheckAttach(const NGrpc::TGRpcClientConfig& clientConfig, const TString& id,
- int expected, bool& allDoneOk);
+
+NGrpc::IStreamRequestCtrl::TPtr CheckAttach(NGrpc::TGRpcClientLow& clientLow, const NGrpc::TGRpcClientConfig& clientConfig,
+ const TString& id, int code, bool& allDoneOk);
+
+void CheckAttach(const NGrpc::TGRpcClientConfig& clientConfig, const TString& id, int expected, bool& allDoneOk);
}
diff --git a/ydb/tests/library/harness/resources/default_yaml.yml b/ydb/tests/library/harness/resources/default_yaml.yml
index 76c651040dd..7090fad7ded 100644
--- a/ydb/tests/library/harness/resources/default_yaml.yml
+++ b/ydb/tests/library/harness/resources/default_yaml.yml
@@ -154,6 +154,7 @@ grpc_config:
- "keyvalue"
- "monitoring"
- "auth"
+ - "query_service"
feature_flags:
enable_persistent_query_stats: true
enable_scheme_transactions_at_scheme_shard: true