diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-07-11 17:59:13 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-07-11 17:59:13 +0300 |
commit | 81e79a6f17123610778c678234e72c182a538715 (patch) | |
tree | 6b32dffa97a2f7bd55abee5c990df1037b95d302 | |
parent | 41effae1b14cbd91927d4d7746c935f773ee87ef (diff) | |
download | ydb-81e79a6f17123610778c678234e72c182a538715.tar.gz |
Subscribe for intermidiate kqp node in case of no local session attach. KIKIMR-18250
22 files changed, 337 insertions, 70 deletions
diff --git a/ydb/core/grpc_services/query/rpc_attach_session.cpp b/ydb/core/grpc_services/query/rpc_attach_session.cpp index 65907d312cf..f3bd028a764 100644 --- a/ydb/core/grpc_services/query/rpc_attach_session.cpp +++ b/ydb/core/grpc_services/query/rpc_attach_session.cpp @@ -78,6 +78,7 @@ private: if (CheckSession(sessionId, req)) { ev->Record.MutableRequest()->SetSessionId(sessionId); + ev->Record.MutableRequest()->SetExtIdleCheck(true); SessionId = sessionId; } else { return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 7ee094c6c39..499782a2fed 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -402,6 +402,11 @@ public: if (BoardPublishActor) { Send(BoardPublishActor, new TEvents::TEvPoison); } + + LocalSessions->ForEachNode([this](TNodeId node) { + Send(TActivationContext::InterconnectProxy(node), new TEvents::TEvUnsubscribe); + }); + return TActor::PassAway(); } @@ -664,20 +669,25 @@ public: const auto traceId = event.GetTraceId(); TKqpRequestInfo requestInfo(traceId); const auto sessionId = request.GetSessionId(); + const bool extIdleCheck = request.GetExtIdleCheck(); const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; Counters->ReportPingSession(dbCounters, request.ByteSize()); - TActorId targetId; // Local session if (sessionInfo) { - KQP_PROXY_LOG_D("Received ping session request, has local session, trace_id: " << traceId); + const bool sameNode = ev->Sender.NodeId() == SelfId().NodeId(); + KQP_PROXY_LOG_D("Received ping session request, has local session: " << sessionId + << ", extIdleCheck: " << extIdleCheck + << ", sameNode: " << sameNode + << ", trace_id: " << traceId); - targetId = sessionInfo->WorkerId; const bool isIdle = LocalSessions->IsSessionIdle(sessionInfo); if (isIdle) { LocalSessions->StopIdleCheck(sessionInfo); - LocalSessions->StartIdleCheck(sessionInfo, GetSessionIdleDuration()); + if (!extIdleCheck) { + LocalSessions->StartIdleCheck(sessionInfo, GetSessionIdleDuration()); + } } auto result = std::make_unique<TEvKqp::TEvPingSessionResponse>(); @@ -687,16 +697,33 @@ public: ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY; record.MutableResponse()->SetSessionStatus(sessionStatus); - Send(ev->Sender, result.release(), 0, ev->Cookie); + if (extIdleCheck && isIdle) { + //TODO: fix + ui32 flags = IEventHandle::FlagTrackDelivery; + if (!sameNode) { + const TNodeId nodeId = ev->Sender.NodeId(); + KQP_PROXY_LOG_T("Subscribe local session: " << sessionInfo->WorkerId + << " to remote: " << ev->Sender << " , nodeId: " << nodeId); + + LocalSessions->AttachSession(sessionInfo, nodeId); + + flags |= IEventHandle::FlagSubscribeOnSession; + } + Send(ev->Sender, result.release(), flags, ev->Cookie); + } else { + Send(ev->Sender, result.release(), 0, ev->Cookie); + } return; } // Forward request to another proxy ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest); - KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId << ", trace_id: " << traceId); + KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId + << ", sender: " << ev->Sender + << ", trace_id: " << traceId); - targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId); + const TActorId targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId); if (!targetId) { return; } @@ -1172,6 +1199,8 @@ public: hFunc(NKqp::TEvGetScriptExecutionOperation, Handle); hFunc(NKqp::TEvListScriptExecutionOperations, Handle); hFunc(NKqp::TEvCancelScriptExecutionOperation, Handle); + hFunc(TEvInterconnect::TEvNodeConnected, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); default: Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->ToString().data()); @@ -1320,13 +1349,18 @@ private: void RemoveSession(const TString& sessionId, const TActorId& workerId) { if (!sessionId.empty()) { - LocalSessions->Erase(sessionId); + auto nodeId = LocalSessions->Erase(sessionId); KqpProxySharedResources->AtomicLocalSessionCount.store(LocalSessions->size()); PublishResourceUsage(); if (ShutdownRequested) { ShutdownState->Update(LocalSessions->size()); } + // No more session with kqp proxy on this node + if (nodeId) { + Send(TActivationContext::InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe); + } + return; } @@ -1385,6 +1419,29 @@ private: Register(CreateCancelScriptExecutionOperationActor(std::move(ev))); } + void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) { + TNodeId nodeId = ev->Get()->NodeId; + auto sessions = LocalSessions->FindSessions(nodeId); + if (sessions) { + KQP_PROXY_LOG_T("Got TEvNodeConnected event from node: " << nodeId + << ", has " << sessions.size() << " sessions"); + } else { + KQP_PROXY_LOG_E("Got TEvNodeConnected event from node without sessions: " << nodeId); + } + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { + TNodeId nodeId = ev->Get()->NodeId; + auto sessions = LocalSessions->FindSessions(nodeId); + KQP_PROXY_LOG_D("Node: " << nodeId << " disconnected, had " << sessions.size() << " sessions."); + const static auto IdleDurationAfterDisconnect = TDuration::Seconds(1); + // Just start standard idle check with small timeout + // It allows to use common code to close and delete expired session + for (const auto sessionInfo : sessions) { + LocalSessions->StartIdleCheck(sessionInfo, IdleDurationAfterDisconnect); + } + } + private: NKikimrConfig::TLogConfig LogConfig; NKikimrConfig::TTableServiceConfig TableServiceConfig; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index 19f28f9a6cf..c74b210c05b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -12,6 +12,8 @@ namespace NKikimr::NKqp { +using TNodeId = ui32; + struct TKqpProxyRequest { TActorId Sender; ui64 SenderCookie = 0; @@ -83,6 +85,7 @@ struct TKqpSessionInfo { NActors::TMonotonic IdleTimeout; // position in the idle list. std::list<TKqpSessionInfo*>::iterator IdlePos; + TNodeId AttachedNodeId; TKqpSessionInfo(const TString& sessionId, const TActorId& workerId, const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos, @@ -95,6 +98,7 @@ struct TKqpSessionInfo { , ReadyPos(std::move(pos)) , IdleTimeout(idleTimeout) , IdlePos(idlePos) + , AttachedNodeId(0) { } }; @@ -107,6 +111,8 @@ class TLocalSessionsRegistry { std::vector<std::vector<TString>> ReadySessions; TIntrusivePtr<IRandomProvider> RandomProvider; std::list<TKqpSessionInfo*> IdleSessions; + // map rpc node to local sessions + THashMap<TNodeId, THashSet<const TKqpSessionInfo*>> AttachedNodesIndex; public: TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider) @@ -114,6 +120,12 @@ public: , RandomProvider(randomProvider) {} + bool AttachSession(const TKqpSessionInfo* sessionInfo, TNodeId nodeId) { + const_cast<TKqpSessionInfo*>(sessionInfo)->AttachedNodeId = nodeId; + auto& actors = AttachedNodesIndex[nodeId]; + return actors.insert(sessionInfo).second; + } + TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId, const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing, TDuration idleDuration) @@ -213,8 +225,9 @@ public: return ShutdownInFlightSessions.size(); } - void Erase(const TString& sessionId) { + TNodeId Erase(const TString& sessionId) { auto it = LocalSessions.find(sessionId); + TNodeId result = 0; if (it != LocalSessions.end()) { auto counter = SessionsCountPerDatabase.find(it->second.Database); if (counter != SessionsCountPerDatabase.end()) { @@ -228,8 +241,22 @@ public: RemoveSessionFromLists(&(it->second)); ShutdownInFlightSessions.erase(sessionId); TargetIdIndex.erase(it->second.WorkerId); + + if (const auto nodeId = it->second.AttachedNodeId) { + auto attIt = AttachedNodesIndex.find(nodeId); + if (attIt != AttachedNodesIndex.end()) { + attIt->second.erase(&(it->second)); + if (attIt->second.empty()) { + result = nodeId; + AttachedNodesIndex.erase(attIt); + } + } + } + LocalSessions.erase(it); } + + return result; } bool IsPendingShutdown(const TString& sessionId) const { @@ -257,10 +284,30 @@ public: return LocalSessions.FindPtr(sessionId); } - void Erase(const TActorId& targetId) { + const THashSet<const TKqpSessionInfo*>& FindSessions(const TNodeId& nodeId) const { + auto it = AttachedNodesIndex.find(nodeId); + if (it == AttachedNodesIndex.end()) { + static THashSet<const TKqpSessionInfo*> empty; + return empty; + } + return it->second; + } + + TNodeId Erase(const TActorId& targetId) { + TNodeId nodeId = 0; + auto it = TargetIdIndex.find(targetId); if (it != TargetIdIndex.end()){ - Erase(it->second); + nodeId = Erase(it->second); + } + + return nodeId; + } + + template<typename TCb> + void ForEachNode(TCb&& cb) { + for (const auto& n : AttachedNodesIndex) { + cb(n.first); } } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 68cda11e5ec..dc2d84b1dce 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -325,6 +325,7 @@ message TEvCreateSessionResponse { message TPingSessionRequest { optional bytes SessionId = 1; optional uint32 TimeoutMs = 2 [default = 5000]; + optional bool ExtIdleCheck = 3; //It disables kqp proxy idle check } message TEvPingSessionRequest { diff --git a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt index 2077daf05e3..4ef25c28ab7 100644 --- a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(sdk_sessions_pool_ut) add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) +add_subdirectory(ut_helpers) add_library(ydb-services-ydb) target_link_libraries(ydb-services-ydb PUBLIC diff --git a/ydb/services/ydb/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/CMakeLists.linux-aarch64.txt index 3ab94530185..c634dc26935 100644 --- a/ydb/services/ydb/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ add_subdirectory(sdk_sessions_pool_ut) add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) +add_subdirectory(ut_helpers) add_library(ydb-services-ydb) target_link_libraries(ydb-services-ydb PUBLIC diff --git a/ydb/services/ydb/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/CMakeLists.linux-x86_64.txt index 3ab94530185..c634dc26935 100644 --- a/ydb/services/ydb/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ydb/CMakeLists.linux-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(sdk_sessions_pool_ut) add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) +add_subdirectory(ut_helpers) add_library(ydb-services-ydb) target_link_libraries(ydb-services-ydb PUBLIC diff --git a/ydb/services/ydb/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/CMakeLists.windows-x86_64.txt index 2077daf05e3..4ef25c28ab7 100644 --- a/ydb/services/ydb/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ydb/CMakeLists.windows-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(sdk_sessions_pool_ut) add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) +add_subdirectory(ut_helpers) add_library(ydb-services-ydb) target_link_libraries(ydb-services-ydb PUBLIC diff --git a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt index ca652056594..7d0164194eb 100644 --- a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt @@ -41,6 +41,7 @@ target_link_libraries(ydb-services-ydb-ut PUBLIC cpp-client-ydb_operation cpp-client-ydb_scheme cpp-client-ydb_monitoring + services-ydb-ut_helpers ) target_link_options(ydb-services-ydb-ut PRIVATE -Wl,-platform_version,macos,11.0,11.0 diff --git a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt index 8f4a9f440b8..44cfa20b9da 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt @@ -41,6 +41,7 @@ target_link_libraries(ydb-services-ydb-ut PUBLIC cpp-client-ydb_operation cpp-client-ydb_scheme cpp-client-ydb_monitoring + services-ydb-ut_helpers ) target_link_options(ydb-services-ydb-ut PRIVATE -ldl diff --git a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt index 99673c83ec4..4888d5ec79f 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt @@ -42,6 +42,7 @@ target_link_libraries(ydb-services-ydb-ut PUBLIC cpp-client-ydb_operation cpp-client-ydb_scheme cpp-client-ydb_monitoring + services-ydb-ut_helpers ) target_link_options(ydb-services-ydb-ut PRIVATE -ldl diff --git a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt index f851c69d196..8e34646aa85 100644 --- a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt @@ -41,6 +41,7 @@ target_link_libraries(ydb-services-ydb-ut PUBLIC cpp-client-ydb_operation cpp-client-ydb_scheme cpp-client-ydb_monitoring + services-ydb-ut_helpers ) target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_bulk_upsert_ut.cpp diff --git a/ydb/services/ydb/ut/ya.make b/ydb/services/ydb/ut/ya.make index ebf7b4976a0..654e30ca365 100644 --- a/ydb/services/ydb/ut/ya.make +++ b/ydb/services/ydb/ut/ya.make @@ -54,6 +54,7 @@ PEERDIR( ydb/public/sdk/cpp/client/ydb_scheme ydb/public/sdk/cpp/client/ydb_monitoring ydb/services/ydb + ydb/services/ydb/ut_helpers ) YQL_LAST_ABI_VERSION() diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/ut_helpers/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..5b672d97e57 --- /dev/null +++ b/ydb/services/ydb/ut_helpers/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ydb-ut_helpers) +target_link_libraries(services-ydb-ut_helpers PUBLIC + contrib-libs-cxxsupp + yutil + api-grpc + api-grpc-draft + api-protos + api-protos-out + cpp-grpc-client +) +target_sources(services-ydb-ut_helpers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp +) diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/ut_helpers/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..51193f42857 --- /dev/null +++ b/ydb/services/ydb/ut_helpers/CMakeLists.linux-aarch64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ydb-ut_helpers) +target_link_libraries(services-ydb-ut_helpers PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + api-grpc + api-grpc-draft + api-protos + api-protos-out + cpp-grpc-client +) +target_sources(services-ydb-ut_helpers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp +) diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/ut_helpers/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..51193f42857 --- /dev/null +++ b/ydb/services/ydb/ut_helpers/CMakeLists.linux-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ydb-ut_helpers) +target_link_libraries(services-ydb-ut_helpers PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + api-grpc + api-grpc-draft + api-protos + api-protos-out + cpp-grpc-client +) +target_sources(services-ydb-ut_helpers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp +) diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.txt b/ydb/services/ydb/ut_helpers/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/services/ydb/ut_helpers/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/ydb/ut_helpers/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/ut_helpers/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..5b672d97e57 --- /dev/null +++ b/ydb/services/ydb/ut_helpers/CMakeLists.windows-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ydb-ut_helpers) +target_link_libraries(services-ydb-ut_helpers PUBLIC + contrib-libs-cxxsupp + yutil + api-grpc + api-grpc-draft + api-protos + api-protos-out + cpp-grpc-client +) +target_sources(services-ydb-ut_helpers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp +) diff --git a/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp b/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp new file mode 100644 index 00000000000..0d67120823d --- /dev/null +++ b/ydb/services/ydb/ut_helpers/ut_helpers_query.cpp @@ -0,0 +1,71 @@ +#include "ut_helpers_query.h" + +#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h> +#include <library/cpp/grpc/client/grpc_common.h> +#include <library/cpp/grpc/client/grpc_client_low.h> +#include <library/cpp/threading/future/future.h> + +#include <util/generic/string.h> + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NGrpc; + +namespace NTestHelpers { + +TString CreateQuerySession(const TGRpcClientConfig& clientConfig) { + NGrpc::TGRpcClientLow clientLow; + auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig); + + Ydb::Query::CreateSessionRequest request; + TString sessionId; + + NGrpc::TResponseCallback<Ydb::Query::CreateSessionResponse> responseCb = + [&sessionId](NGrpc::TGrpcStatus&& grpcStatus, Ydb::Query::CreateSessionResponse&& response) -> void { + UNIT_ASSERT(!grpcStatus.InternalError); + UNIT_ASSERT_C(grpcStatus.GRpcStatusCode == 0, grpcStatus.Msg + " " + grpcStatus.Details); + UNIT_ASSERT_VALUES_EQUAL(response.status(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT(response.session_id() != ""); + sessionId = response.session_id(); + }; + + connection->DoRequest(request, std::move(responseCb), &Ydb::Query::V1::QueryService::Stub::AsyncCreateSession); + + return sessionId; +} + +using TProcessor = typename NGrpc::IStreamRequestReadProcessor<Ydb::Query::SessionState>::TPtr; +void CheckAttach(const TGRpcClientConfig& clientConfig, const TString& id, + int code, bool& allDoneOk) +{ + const Ydb::StatusIds::StatusCode expected = static_cast<Ydb::StatusIds::StatusCode>(code); + NGrpc::TGRpcClientLow clientLow; + auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig); + + Ydb::Query::AttachSessionRequest request; + request.set_session_id(id); + + auto promise = NThreading::NewPromise<TProcessor>(); + auto cb = [&allDoneOk, promise, expected](TGrpcStatus grpcStatus, TProcessor processor) mutable { + UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK); + auto resp = std::make_shared<Ydb::Query::SessionState>(); + processor->Read(resp.get(), [&allDoneOk, resp, promise, processor, expected](TGrpcStatus grpcStatus) mutable { + UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK); + allDoneOk &= (resp->status() == expected); + if (!allDoneOk) { + Cerr << "Got attach response: " << resp->DebugString() << Endl; + } + promise.SetValue(processor); + }); + }; + + connection->DoStreamRequest<Ydb::Query::AttachSessionRequest, Ydb::Query::SessionState>( + request, + cb, + &Ydb::Query::V1::QueryService::Stub::AsyncAttachSession); + + auto provider = promise.GetFuture().GetValueSync(); + provider->Cancel(); +} + +} diff --git a/ydb/services/ydb/ut_helpers/ut_helpers_query.h b/ydb/services/ydb/ut_helpers/ut_helpers_query.h new file mode 100644 index 00000000000..bdeef075eab --- /dev/null +++ b/ydb/services/ydb/ut_helpers/ut_helpers_query.h @@ -0,0 +1,12 @@ +#pragma once + +#include <util/generic/fwd.h> +#include <library/cpp/grpc/client/grpc_client_low.h> + +namespace NTestHelpers { + +TString CreateQuerySession(const NGrpc::TGRpcClientConfig& clientConfig); +void CheckAttach(const NGrpc::TGRpcClientConfig& clientConfig, const TString& id, + int expected, bool& allDoneOk); + +} diff --git a/ydb/services/ydb/ut_helpers/ya.make b/ydb/services/ydb/ut_helpers/ya.make new file mode 100644 index 00000000000..9047fbebd1b --- /dev/null +++ b/ydb/services/ydb/ut_helpers/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + ut_helpers_query.cpp +) + +PEERDIR( + ydb/public/api/grpc + ydb/public/api/grpc/draft + ydb/public/api/protos + ydb/public/api/protos/out + library/cpp/grpc/client +) + +END() + diff --git a/ydb/services/ydb/ydb_query_ut.cpp b/ydb/services/ydb/ydb_query_ut.cpp index 46acf56dfb2..fb248d5e9c9 100644 --- a/ydb/services/ydb/ydb_query_ut.cpp +++ b/ydb/services/ydb/ydb_query_ut.cpp @@ -1,67 +1,13 @@ #include "ydb_common_ut.h" +#include "ut_helpers/ut_helpers_query.h" #include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h> +#include <library/cpp/testing/unittest/tests_data.h> + using namespace NYdb; using namespace NGrpc; - -namespace { - -TString CreateSession(const NGRpcProxy::TGRpcClientConfig& clientConfig) { - NGrpc::TGRpcClientLow clientLow; - auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig); - - Ydb::Query::CreateSessionRequest request; - TString sessionId; - - NGrpc::TResponseCallback<Ydb::Query::CreateSessionResponse> responseCb = - [&sessionId](NGrpc::TGrpcStatus&& grpcStatus, Ydb::Query::CreateSessionResponse&& response) -> void { - UNIT_ASSERT(!grpcStatus.InternalError); - UNIT_ASSERT(grpcStatus.GRpcStatusCode == 0); - UNIT_ASSERT_VALUES_EQUAL(response.status(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT(response.session_id() != ""); - sessionId = response.session_id(); - }; - - connection->DoRequest(request, std::move(responseCb), &Ydb::Query::V1::QueryService::Stub::AsyncCreateSession); - - return sessionId; -} - -using TProcessor = typename NGrpc::IStreamRequestReadProcessor<Ydb::Query::SessionState>::TPtr; -void CheckAttach(const NGRpcProxy::TGRpcClientConfig& clientConfig, const TString& id, - const Ydb::StatusIds::StatusCode expected, bool& allDoneOk) -{ - NGrpc::TGRpcClientLow clientLow; - auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig); - - Ydb::Query::AttachSessionRequest request; - request.set_session_id(id); - - auto promise = NThreading::NewPromise<TProcessor>(); - auto cb = [&allDoneOk, promise, expected](TGrpcStatus grpcStatus, TProcessor processor) mutable { - UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK); - auto resp = std::make_shared<Ydb::Query::SessionState>(); - processor->Read(resp.get(), [&allDoneOk, resp, promise, processor, expected](TGrpcStatus grpcStatus) mutable { - UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK); - allDoneOk &= (resp->status() == expected); - if (!allDoneOk) { - Cerr << "Got attach response: " << resp->DebugString() << Endl; - } - promise.SetValue(processor); - }); - }; - - connection->DoStreamRequest<Ydb::Query::AttachSessionRequest, Ydb::Query::SessionState>( - request, - cb, - &Ydb::Query::V1::QueryService::Stub::AsyncAttachSession); - - auto processor = promise.GetFuture().GetValueSync(); - processor->Cancel(); -} - -} +using namespace NTestHelpers; Y_UNIT_TEST_SUITE(YdbQueryService) { Y_UNIT_TEST(TestCreateAndAttachSession) { @@ -73,7 +19,7 @@ Y_UNIT_TEST_SUITE(YdbQueryService) { auto clientConfig = NGRpcProxy::TGRpcClientConfig(location); bool allDoneOk = true; - TString sessionId = CreateSession(clientConfig); + TString sessionId = CreateQuerySession(clientConfig); UNIT_ASSERT(sessionId); |