aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-07-11 17:59:13 +0300
committerdcherednik <dcherednik@ydb.tech>2023-07-11 17:59:13 +0300
commit81e79a6f17123610778c678234e72c182a538715 (patch)
tree6b32dffa97a2f7bd55abee5c990df1037b95d302
parent41effae1b14cbd91927d4d7746c935f773ee87ef (diff)
downloadydb-81e79a6f17123610778c678234e72c182a538715.tar.gz
Subscribe for intermidiate kqp node in case of no local session attach. KIKIMR-18250
-rw-r--r--ydb/core/grpc_services/query/rpc_attach_session.cpp1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp73
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h53
-rw-r--r--ydb/core/protos/kqp.proto1
-rw-r--r--ydb/services/ydb/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/ydb/ut/ya.make1
-rw-r--r--ydb/services/ydb/ut_helpers/CMakeLists.darwin-x86_64.txt22
-rw-r--r--ydb/services/ydb/ut_helpers/CMakeLists.linux-aarch64.txt23
-rw-r--r--ydb/services/ydb/ut_helpers/CMakeLists.linux-x86_64.txt23
-rw-r--r--ydb/services/ydb/ut_helpers/CMakeLists.txt17
-rw-r--r--ydb/services/ydb/ut_helpers/CMakeLists.windows-x86_64.txt22
-rw-r--r--ydb/services/ydb/ut_helpers/ut_helpers_query.cpp71
-rw-r--r--ydb/services/ydb/ut_helpers/ut_helpers_query.h12
-rw-r--r--ydb/services/ydb/ut_helpers/ya.make16
-rw-r--r--ydb/services/ydb/ydb_query_ut.cpp64
22 files changed, 337 insertions, 70 deletions
diff --git a/ydb/core/grpc_services/query/rpc_attach_session.cpp b/ydb/core/grpc_services/query/rpc_attach_session.cpp
index 65907d312cf..f3bd028a764 100644
--- a/ydb/core/grpc_services/query/rpc_attach_session.cpp
+++ b/ydb/core/grpc_services/query/rpc_attach_session.cpp
@@ -78,6 +78,7 @@ private:
if (CheckSession(sessionId, req)) {
ev->Record.MutableRequest()->SetSessionId(sessionId);
+ ev->Record.MutableRequest()->SetExtIdleCheck(true);
SessionId = sessionId;
} else {
return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST);
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 7ee094c6c39..499782a2fed 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -402,6 +402,11 @@ public:
if (BoardPublishActor) {
Send(BoardPublishActor, new TEvents::TEvPoison);
}
+
+ LocalSessions->ForEachNode([this](TNodeId node) {
+ Send(TActivationContext::InterconnectProxy(node), new TEvents::TEvUnsubscribe);
+ });
+
return TActor::PassAway();
}
@@ -664,20 +669,25 @@ public:
const auto traceId = event.GetTraceId();
TKqpRequestInfo requestInfo(traceId);
const auto sessionId = request.GetSessionId();
+ const bool extIdleCheck = request.GetExtIdleCheck();
const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
Counters->ReportPingSession(dbCounters, request.ByteSize());
- TActorId targetId;
// Local session
if (sessionInfo) {
- KQP_PROXY_LOG_D("Received ping session request, has local session, trace_id: " << traceId);
+ const bool sameNode = ev->Sender.NodeId() == SelfId().NodeId();
+ KQP_PROXY_LOG_D("Received ping session request, has local session: " << sessionId
+ << ", extIdleCheck: " << extIdleCheck
+ << ", sameNode: " << sameNode
+ << ", trace_id: " << traceId);
- targetId = sessionInfo->WorkerId;
const bool isIdle = LocalSessions->IsSessionIdle(sessionInfo);
if (isIdle) {
LocalSessions->StopIdleCheck(sessionInfo);
- LocalSessions->StartIdleCheck(sessionInfo, GetSessionIdleDuration());
+ if (!extIdleCheck) {
+ LocalSessions->StartIdleCheck(sessionInfo, GetSessionIdleDuration());
+ }
}
auto result = std::make_unique<TEvKqp::TEvPingSessionResponse>();
@@ -687,16 +697,33 @@ public:
? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY
: Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY;
record.MutableResponse()->SetSessionStatus(sessionStatus);
- Send(ev->Sender, result.release(), 0, ev->Cookie);
+ if (extIdleCheck && isIdle) {
+ //TODO: fix
+ ui32 flags = IEventHandle::FlagTrackDelivery;
+ if (!sameNode) {
+ const TNodeId nodeId = ev->Sender.NodeId();
+ KQP_PROXY_LOG_T("Subscribe local session: " << sessionInfo->WorkerId
+ << " to remote: " << ev->Sender << " , nodeId: " << nodeId);
+
+ LocalSessions->AttachSession(sessionInfo, nodeId);
+
+ flags |= IEventHandle::FlagSubscribeOnSession;
+ }
+ Send(ev->Sender, result.release(), flags, ev->Cookie);
+ } else {
+ Send(ev->Sender, result.release(), 0, ev->Cookie);
+ }
return;
}
// Forward request to another proxy
ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest);
- KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId << ", trace_id: " << traceId);
+ KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId
+ << ", sender: " << ev->Sender
+ << ", trace_id: " << traceId);
- targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
+ const TActorId targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
if (!targetId) {
return;
}
@@ -1172,6 +1199,8 @@ public:
hFunc(NKqp::TEvGetScriptExecutionOperation, Handle);
hFunc(NKqp::TEvListScriptExecutionOperations, Handle);
hFunc(NKqp::TEvCancelScriptExecutionOperation, Handle);
+ hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
default:
Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->ToString().data());
@@ -1320,13 +1349,18 @@ private:
void RemoveSession(const TString& sessionId, const TActorId& workerId) {
if (!sessionId.empty()) {
- LocalSessions->Erase(sessionId);
+ auto nodeId = LocalSessions->Erase(sessionId);
KqpProxySharedResources->AtomicLocalSessionCount.store(LocalSessions->size());
PublishResourceUsage();
if (ShutdownRequested) {
ShutdownState->Update(LocalSessions->size());
}
+ // No more session with kqp proxy on this node
+ if (nodeId) {
+ Send(TActivationContext::InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe);
+ }
+
return;
}
@@ -1385,6 +1419,29 @@ private:
Register(CreateCancelScriptExecutionOperationActor(std::move(ev)));
}
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
+ TNodeId nodeId = ev->Get()->NodeId;
+ auto sessions = LocalSessions->FindSessions(nodeId);
+ if (sessions) {
+ KQP_PROXY_LOG_T("Got TEvNodeConnected event from node: " << nodeId
+ << ", has " << sessions.size() << " sessions");
+ } else {
+ KQP_PROXY_LOG_E("Got TEvNodeConnected event from node without sessions: " << nodeId);
+ }
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
+ TNodeId nodeId = ev->Get()->NodeId;
+ auto sessions = LocalSessions->FindSessions(nodeId);
+ KQP_PROXY_LOG_D("Node: " << nodeId << " disconnected, had " << sessions.size() << " sessions.");
+ const static auto IdleDurationAfterDisconnect = TDuration::Seconds(1);
+ // Just start standard idle check with small timeout
+ // It allows to use common code to close and delete expired session
+ for (const auto sessionInfo : sessions) {
+ LocalSessions->StartIdleCheck(sessionInfo, IdleDurationAfterDisconnect);
+ }
+ }
+
private:
NKikimrConfig::TLogConfig LogConfig;
NKikimrConfig::TTableServiceConfig TableServiceConfig;
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h
index 19f28f9a6cf..c74b210c05b 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h
@@ -12,6 +12,8 @@
namespace NKikimr::NKqp {
+using TNodeId = ui32;
+
struct TKqpProxyRequest {
TActorId Sender;
ui64 SenderCookie = 0;
@@ -83,6 +85,7 @@ struct TKqpSessionInfo {
NActors::TMonotonic IdleTimeout;
// position in the idle list.
std::list<TKqpSessionInfo*>::iterator IdlePos;
+ TNodeId AttachedNodeId;
TKqpSessionInfo(const TString& sessionId, const TActorId& workerId,
const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos,
@@ -95,6 +98,7 @@ struct TKqpSessionInfo {
, ReadyPos(std::move(pos))
, IdleTimeout(idleTimeout)
, IdlePos(idlePos)
+ , AttachedNodeId(0)
{
}
};
@@ -107,6 +111,8 @@ class TLocalSessionsRegistry {
std::vector<std::vector<TString>> ReadySessions;
TIntrusivePtr<IRandomProvider> RandomProvider;
std::list<TKqpSessionInfo*> IdleSessions;
+ // map rpc node to local sessions
+ THashMap<TNodeId, THashSet<const TKqpSessionInfo*>> AttachedNodesIndex;
public:
TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider)
@@ -114,6 +120,12 @@ public:
, RandomProvider(randomProvider)
{}
+ bool AttachSession(const TKqpSessionInfo* sessionInfo, TNodeId nodeId) {
+ const_cast<TKqpSessionInfo*>(sessionInfo)->AttachedNodeId = nodeId;
+ auto& actors = AttachedNodesIndex[nodeId];
+ return actors.insert(sessionInfo).second;
+ }
+
TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId,
const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing,
TDuration idleDuration)
@@ -213,8 +225,9 @@ public:
return ShutdownInFlightSessions.size();
}
- void Erase(const TString& sessionId) {
+ TNodeId Erase(const TString& sessionId) {
auto it = LocalSessions.find(sessionId);
+ TNodeId result = 0;
if (it != LocalSessions.end()) {
auto counter = SessionsCountPerDatabase.find(it->second.Database);
if (counter != SessionsCountPerDatabase.end()) {
@@ -228,8 +241,22 @@ public:
RemoveSessionFromLists(&(it->second));
ShutdownInFlightSessions.erase(sessionId);
TargetIdIndex.erase(it->second.WorkerId);
+
+ if (const auto nodeId = it->second.AttachedNodeId) {
+ auto attIt = AttachedNodesIndex.find(nodeId);
+ if (attIt != AttachedNodesIndex.end()) {
+ attIt->second.erase(&(it->second));
+ if (attIt->second.empty()) {
+ result = nodeId;
+ AttachedNodesIndex.erase(attIt);
+ }
+ }
+ }
+
LocalSessions.erase(it);
}
+
+ return result;
}
bool IsPendingShutdown(const TString& sessionId) const {
@@ -257,10 +284,30 @@ public:
return LocalSessions.FindPtr(sessionId);
}
- void Erase(const TActorId& targetId) {
+ const THashSet<const TKqpSessionInfo*>& FindSessions(const TNodeId& nodeId) const {
+ auto it = AttachedNodesIndex.find(nodeId);
+ if (it == AttachedNodesIndex.end()) {
+ static THashSet<const TKqpSessionInfo*> empty;
+ return empty;
+ }
+ return it->second;
+ }
+
+ TNodeId Erase(const TActorId& targetId) {
+ TNodeId nodeId = 0;
+
auto it = TargetIdIndex.find(targetId);
if (it != TargetIdIndex.end()){
- Erase(it->second);
+ nodeId = Erase(it->second);
+ }
+
+ return nodeId;
+ }
+
+ template<typename TCb>
+ void ForEachNode(TCb&& cb) {
+ for (const auto& n : AttachedNodesIndex) {
+ cb(n.first);
}
}
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 68cda11e5ec..dc2d84b1dce 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -325,6 +325,7 @@ message TEvCreateSessionResponse {
message TPingSessionRequest {
optional bytes SessionId = 1;
optional uint32 TimeoutMs = 2 [default = 5000];
+ optional bool ExtIdleCheck = 3; //It disables kqp proxy idle check
}
message TEvPingSessionRequest {
diff --git a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt
index 2077daf05e3..4ef25c28ab7 100644
--- a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(sdk_sessions_pool_ut)
add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
+add_subdirectory(ut_helpers)
add_library(ydb-services-ydb)
target_link_libraries(ydb-services-ydb PUBLIC
diff --git a/ydb/services/ydb/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/CMakeLists.linux-aarch64.txt
index 3ab94530185..c634dc26935 100644
--- a/ydb/services/ydb/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/ydb/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ add_subdirectory(sdk_sessions_pool_ut)
add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
+add_subdirectory(ut_helpers)
add_library(ydb-services-ydb)
target_link_libraries(ydb-services-ydb PUBLIC
diff --git a/ydb/services/ydb/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/CMakeLists.linux-x86_64.txt
index 3ab94530185..c634dc26935 100644
--- a/ydb/services/ydb/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/ydb/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(sdk_sessions_pool_ut)
add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
+add_subdirectory(ut_helpers)
add_library(ydb-services-ydb)
target_link_libraries(ydb-services-ydb PUBLIC
diff --git a/ydb/services/ydb/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/CMakeLists.windows-x86_64.txt
index 2077daf05e3..4ef25c28ab7 100644
--- a/ydb/services/ydb/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/ydb/CMakeLists.windows-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(sdk_sessions_pool_ut)
add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
+add_subdirectory(ut_helpers)
add_library(ydb-services-ydb)
target_link_libraries(ydb-services-ydb PUBLIC
diff --git a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt
index ca652056594..7d0164194eb 100644
--- a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt
@@ -41,6 +41,7 @@ target_link_libraries(ydb-services-ydb-ut PUBLIC
cpp-client-ydb_operation
cpp-client-ydb_scheme
cpp-client-ydb_monitoring
+ services-ydb-ut_helpers
)
target_link_options(ydb-services-ydb-ut PRIVATE
-Wl,-platform_version,macos,11.0,11.0
diff --git a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt
index 8f4a9f440b8..44cfa20b9da 100644
--- a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt
@@ -41,6 +41,7 @@ target_link_libraries(ydb-services-ydb-ut PUBLIC
cpp-client-ydb_operation
cpp-client-ydb_scheme
cpp-client-ydb_monitoring
+ services-ydb-ut_helpers
)
target_link_options(ydb-services-ydb-ut PRIVATE
-ldl
diff --git a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt
index 99673c83ec4..4888d5ec79f 100644
--- a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt
@@ -42,6 +42,7 @@ target_link_libraries(ydb-services-ydb-ut PUBLIC
cpp-client-ydb_operation
cpp-client-ydb_scheme
cpp-client-ydb_monitoring
+ services-ydb-ut_helpers
)
target_link_options(ydb-services-ydb-ut PRIVATE
-ldl
diff --git a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt
index f851c69d196..8e34646aa85 100644
--- a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt
@@ -41,6 +41,7 @@ target_link_libraries(ydb-services-ydb-ut PUBLIC
cpp-client-ydb_operation
cpp-client-ydb_scheme
cpp-client-ydb_monitoring
+ services-ydb-ut_helpers
)
target_sources(ydb-services-ydb-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_bulk_upsert_ut.cpp
diff --git a/ydb/services/ydb/ut/ya.make b/ydb/services/ydb/ut/ya.make
index ebf7b4976a0..654e30ca365 100644
--- a/ydb/services/ydb/ut/ya.make
+++ b/ydb/services/ydb/ut/ya.make
@@ -54,6 +54,7 @@ PEERDIR(
ydb/public/sdk/cpp/client/ydb_scheme
ydb/public/sdk/cpp/client/ydb_monitoring
ydb/services/ydb
+ ydb/services/ydb/ut_helpers
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/ut_helpers/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..5b672d97e57
--- /dev/null
+++ b/ydb/services/ydb/ut_helpers/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ydb-ut_helpers)
+target_link_libraries(services-ydb-ut_helpers PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc
+ api-grpc-draft
+ api-protos
+ api-protos-out
+ cpp-grpc-client
+)
+target_sources(services-ydb-ut_helpers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp
+)
diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/ut_helpers/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..51193f42857
--- /dev/null
+++ b/ydb/services/ydb/ut_helpers/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ydb-ut_helpers)
+target_link_libraries(services-ydb-ut_helpers PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc
+ api-grpc-draft
+ api-protos
+ api-protos-out
+ cpp-grpc-client
+)
+target_sources(services-ydb-ut_helpers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp
+)
diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/ut_helpers/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..51193f42857
--- /dev/null
+++ b/ydb/services/ydb/ut_helpers/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ydb-ut_helpers)
+target_link_libraries(services-ydb-ut_helpers PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc
+ api-grpc-draft
+ api-protos
+ api-protos-out
+ cpp-grpc-client
+)
+target_sources(services-ydb-ut_helpers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp
+)
diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.txt b/ydb/services/ydb/ut_helpers/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/services/ydb/ut_helpers/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/ut_helpers/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..5b672d97e57
--- /dev/null
+++ b/ydb/services/ydb/ut_helpers/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ydb-ut_helpers)
+target_link_libraries(services-ydb-ut_helpers PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc
+ api-grpc-draft
+ api-protos
+ api-protos-out
+ cpp-grpc-client
+)
+target_sources(services-ydb-ut_helpers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp
+)
diff --git a/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp b/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp
new file mode 100644
index 00000000000..0d67120823d
--- /dev/null
+++ b/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp
@@ -0,0 +1,71 @@
+#include "ut_helpers_query.h"
+
+#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h>
+#include <library/cpp/grpc/client/grpc_common.h>
+#include <library/cpp/grpc/client/grpc_client_low.h>
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/string.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NGrpc;
+
+namespace NTestHelpers {
+
+TString CreateQuerySession(const TGRpcClientConfig& clientConfig) {
+ NGrpc::TGRpcClientLow clientLow;
+ auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig);
+
+ Ydb::Query::CreateSessionRequest request;
+ TString sessionId;
+
+ NGrpc::TResponseCallback<Ydb::Query::CreateSessionResponse> responseCb =
+ [&sessionId](NGrpc::TGrpcStatus&& grpcStatus, Ydb::Query::CreateSessionResponse&& response) -> void {
+ UNIT_ASSERT(!grpcStatus.InternalError);
+ UNIT_ASSERT_C(grpcStatus.GRpcStatusCode == 0, grpcStatus.Msg + " " + grpcStatus.Details);
+ UNIT_ASSERT_VALUES_EQUAL(response.status(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT(response.session_id() != "");
+ sessionId = response.session_id();
+ };
+
+ connection->DoRequest(request, std::move(responseCb), &Ydb::Query::V1::QueryService::Stub::AsyncCreateSession);
+
+ return sessionId;
+}
+
+using TProcessor = typename NGrpc::IStreamRequestReadProcessor<Ydb::Query::SessionState>::TPtr;
+void CheckAttach(const TGRpcClientConfig& clientConfig, const TString& id,
+ int code, bool& allDoneOk)
+{
+ const Ydb::StatusIds::StatusCode expected = static_cast<Ydb::StatusIds::StatusCode>(code);
+ NGrpc::TGRpcClientLow clientLow;
+ auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig);
+
+ Ydb::Query::AttachSessionRequest request;
+ request.set_session_id(id);
+
+ auto promise = NThreading::NewPromise<TProcessor>();
+ auto cb = [&allDoneOk, promise, expected](TGrpcStatus grpcStatus, TProcessor processor) mutable {
+ UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK);
+ auto resp = std::make_shared<Ydb::Query::SessionState>();
+ processor->Read(resp.get(), [&allDoneOk, resp, promise, processor, expected](TGrpcStatus grpcStatus) mutable {
+ UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK);
+ allDoneOk &= (resp->status() == expected);
+ if (!allDoneOk) {
+ Cerr << "Got attach response: " << resp->DebugString() << Endl;
+ }
+ promise.SetValue(processor);
+ });
+ };
+
+ connection->DoStreamRequest<Ydb::Query::AttachSessionRequest, Ydb::Query::SessionState>(
+ request,
+ cb,
+ &Ydb::Query::V1::QueryService::Stub::AsyncAttachSession);
+
+ auto provider = promise.GetFuture().GetValueSync();
+ provider->Cancel();
+}
+
+}
diff --git a/ydb/services/ydb/ut_helpers/ut_helpers_query.h b/ydb/services/ydb/ut_helpers/ut_helpers_query.h
new file mode 100644
index 00000000000..bdeef075eab
--- /dev/null
+++ b/ydb/services/ydb/ut_helpers/ut_helpers_query.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+#include <library/cpp/grpc/client/grpc_client_low.h>
+
+namespace NTestHelpers {
+
+TString CreateQuerySession(const NGrpc::TGRpcClientConfig& clientConfig);
+void CheckAttach(const NGrpc::TGRpcClientConfig& clientConfig, const TString& id,
+ int expected, bool& allDoneOk);
+
+}
diff --git a/ydb/services/ydb/ut_helpers/ya.make b/ydb/services/ydb/ut_helpers/ya.make
new file mode 100644
index 00000000000..9047fbebd1b
--- /dev/null
+++ b/ydb/services/ydb/ut_helpers/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ ut_helpers_query.cpp
+)
+
+PEERDIR(
+ ydb/public/api/grpc
+ ydb/public/api/grpc/draft
+ ydb/public/api/protos
+ ydb/public/api/protos/out
+ library/cpp/grpc/client
+)
+
+END()
+
diff --git a/ydb/services/ydb/ydb_query_ut.cpp b/ydb/services/ydb/ydb_query_ut.cpp
index 46acf56dfb2..fb248d5e9c9 100644
--- a/ydb/services/ydb/ydb_query_ut.cpp
+++ b/ydb/services/ydb/ydb_query_ut.cpp
@@ -1,67 +1,13 @@
#include "ydb_common_ut.h"
+#include "ut_helpers/ut_helpers_query.h"
#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+
using namespace NYdb;
using namespace NGrpc;
-
-namespace {
-
-TString CreateSession(const NGRpcProxy::TGRpcClientConfig& clientConfig) {
- NGrpc::TGRpcClientLow clientLow;
- auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig);
-
- Ydb::Query::CreateSessionRequest request;
- TString sessionId;
-
- NGrpc::TResponseCallback<Ydb::Query::CreateSessionResponse> responseCb =
- [&sessionId](NGrpc::TGrpcStatus&& grpcStatus, Ydb::Query::CreateSessionResponse&& response) -> void {
- UNIT_ASSERT(!grpcStatus.InternalError);
- UNIT_ASSERT(grpcStatus.GRpcStatusCode == 0);
- UNIT_ASSERT_VALUES_EQUAL(response.status(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT(response.session_id() != "");
- sessionId = response.session_id();
- };
-
- connection->DoRequest(request, std::move(responseCb), &Ydb::Query::V1::QueryService::Stub::AsyncCreateSession);
-
- return sessionId;
-}
-
-using TProcessor = typename NGrpc::IStreamRequestReadProcessor<Ydb::Query::SessionState>::TPtr;
-void CheckAttach(const NGRpcProxy::TGRpcClientConfig& clientConfig, const TString& id,
- const Ydb::StatusIds::StatusCode expected, bool& allDoneOk)
-{
- NGrpc::TGRpcClientLow clientLow;
- auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig);
-
- Ydb::Query::AttachSessionRequest request;
- request.set_session_id(id);
-
- auto promise = NThreading::NewPromise<TProcessor>();
- auto cb = [&allDoneOk, promise, expected](TGrpcStatus grpcStatus, TProcessor processor) mutable {
- UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK);
- auto resp = std::make_shared<Ydb::Query::SessionState>();
- processor->Read(resp.get(), [&allDoneOk, resp, promise, processor, expected](TGrpcStatus grpcStatus) mutable {
- UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK);
- allDoneOk &= (resp->status() == expected);
- if (!allDoneOk) {
- Cerr << "Got attach response: " << resp->DebugString() << Endl;
- }
- promise.SetValue(processor);
- });
- };
-
- connection->DoStreamRequest<Ydb::Query::AttachSessionRequest, Ydb::Query::SessionState>(
- request,
- cb,
- &Ydb::Query::V1::QueryService::Stub::AsyncAttachSession);
-
- auto processor = promise.GetFuture().GetValueSync();
- processor->Cancel();
-}
-
-}
+using namespace NTestHelpers;
Y_UNIT_TEST_SUITE(YdbQueryService) {
Y_UNIT_TEST(TestCreateAndAttachSession) {
@@ -73,7 +19,7 @@ Y_UNIT_TEST_SUITE(YdbQueryService) {
auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);
bool allDoneOk = true;
- TString sessionId = CreateSession(clientConfig);
+ TString sessionId = CreateQuerySession(clientConfig);
UNIT_ASSERT(sessionId);