diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-08-02 15:20:53 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-08-02 15:20:53 +0300 |
commit | d199195371ee005f086b1a6d061c955c828b199d (patch) | |
tree | dae85b0bf6915ca56ce03c19c92cda704ee3c9d2 | |
parent | bf2a438dc5d975e2b908ee20895e46331d23211b (diff) | |
download | ydb-d199195371ee005f086b1a6d061c955c828b199d.tar.gz |
Experimental session support for query service. KIKIMR-18788
30 files changed, 720 insertions, 131 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index cc852fa465f..3275666d298 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -143,8 +143,10 @@ public: .UseQueryCache(false)); } - NYdb::NQuery::TQueryClient GetQueryClient() const { - return NYdb::NQuery::TQueryClient(*Driver); + NYdb::NQuery::TQueryClient GetQueryClient( + NYdb::NQuery::TClientSettings settings = NYdb::NQuery::TClientSettings()) const + { + return NYdb::NQuery::TQueryClient(*Driver, settings); } bool IsUsingSnapshotReads() const { diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index 8e7df95f899..c46f698bf92 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -4,6 +4,8 @@ #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> +#include <ydb/core/kqp/counters/kqp_counters.h> + namespace NKikimr { namespace NKqp { @@ -11,6 +13,38 @@ using namespace NYdb; using namespace NYdb::NQuery; Y_UNIT_TEST_SUITE(KqpQueryService) { + Y_UNIT_TEST(SessionFromPoolError) { + auto kikimr = DefaultKikimrRunner(); + auto settings = NYdb::NQuery::TClientSettings().Database("WrongDB"); + auto db = kikimr.GetQueryClient(settings); + + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::NOT_FOUND, result.GetIssues().ToString()); + } + + Y_UNIT_TEST(SessionFromPoolSuccess) { + auto kikimr = DefaultKikimrRunner(); + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + { + auto db = kikimr.GetQueryClient(); + + TString id; + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetSession().GetId()); + id = result.GetSession().GetId(); + } + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetSession().GetId(), id); + } + } + WaitForZeroSessions(counters); + } + Y_UNIT_TEST(StreamExecuteQueryPure) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h index 3f3b85ede05..40fa6b39f8c 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h @@ -399,6 +399,7 @@ public: TStreamRpc<TService, TRequest, TResponse, NGrpc::TStreamRequestReadProcessor> rpc, TDbDriverStatePtr dbState, const TRpcRequestSettings& requestSettings, + const TEndpointKey& preferredEndpoint = TEndpointKey(), std::shared_ptr<IQueueClientContext> context = nullptr) { using NGrpc::TGrpcStatus; @@ -489,7 +490,7 @@ public: std::move(rpc), std::move(meta), context.get()); - }, dbState, TEndpointKey(), endpointPolicy); + }, dbState, preferredEndpoint, endpointPolicy); } template<class TService, class TRequest, class TResponse, class TCallback> diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp index 6a083f47509..617e897b182 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp @@ -4,7 +4,7 @@ namespace NYdb { -static ui64 GetNodeIdFromSession(const TStringType& sessionId) { +ui64 GetNodeIdFromSession(const TStringType& sessionId) { if (sessionId.empty()) { return 0; } diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h index a932f7e54e3..46170b0ca9e 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h @@ -12,6 +12,7 @@ namespace NYdb { //////////////////////////////////////////////////////////////////////////////// +ui64 GetNodeIdFromSession(const TStringType& sessionId); class TKqpSessionCommon : public TEndpointObj { public: diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h index ab6393261cf..49f26b28d0e 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h @@ -11,7 +11,6 @@ class TKqpSessionCommon; class ISessionClient { public: virtual ~ISessionClient() = default; - virtual void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) = 0; virtual void DeleteSession(TKqpSessionCommon* sessionImpl) = 0; // TODO: Try to remove from ISessionClient virtual bool ReturnSession(TKqpSessionCommon* sessionImpl) = 0; diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.darwin-x86_64.txt index f46a9677174..8cdf68c0bcb 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.darwin-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(impl-ydb_internal-session_pool PUBLIC cpp-threading-future api-protos client-impl-ydb_endpoints + client-ydb_types-operation public-issue-protos ) target_sources(impl-ydb_internal-session_pool PRIVATE diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-aarch64.txt index 6248ba9c489..f399a8d31e8 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-aarch64.txt @@ -15,6 +15,7 @@ target_link_libraries(impl-ydb_internal-session_pool PUBLIC cpp-threading-future api-protos client-impl-ydb_endpoints + client-ydb_types-operation public-issue-protos ) target_sources(impl-ydb_internal-session_pool PRIVATE diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-x86_64.txt index 6248ba9c489..f399a8d31e8 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-x86_64.txt @@ -15,6 +15,7 @@ target_link_libraries(impl-ydb_internal-session_pool PUBLIC cpp-threading-future api-protos client-impl-ydb_endpoints + client-ydb_types-operation public-issue-protos ) target_sources(impl-ydb_internal-session_pool PRIVATE diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.windows-x86_64.txt index f46a9677174..8cdf68c0bcb 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.windows-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(impl-ydb_internal-session_pool PUBLIC cpp-threading-future api-protos client-impl-ydb_endpoints + client-ydb_types-operation public-issue-protos ) target_sources(impl-ydb_internal-session_pool PRIVATE diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp index 4b7310084bd..8dc87b46f59 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp @@ -4,11 +4,17 @@ #include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h> #undef INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> +#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> + +#include <util/random/random.h> + namespace NYdb { namespace NSessionPool { using namespace NThreading; +constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4; static const TStatus CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT = TStatus( TPlainStatus( EStatus::CLIENT_RESOURCE_EXHAUSTED, @@ -16,6 +22,37 @@ static const TStatus CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT = TStatus( ) ); +TStatus GetStatus(const TOperation& operation) { + return operation.Status(); +} + +TStatus GetStatus(const TStatus& status) { + return status; +} + +TDuration RandomizeThreshold(TDuration duration) { + TDuration::TValue value = duration.GetValue(); + if (KEEP_ALIVE_RANDOM_FRACTION) { + const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION; + if (randomLimit < 2) + return duration; + value += static_cast<i64>(RandomNumber<ui64>(randomLimit)); + } + return TDuration::FromValue(value); +} + +bool IsSessionCloseRequested(const TStatus& status) { + const auto& meta = status.GetResponseMetadata(); + auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS); + for(auto it = hints.first; it != hints.second; ++it) { + if (it->second == NYdb::YDB_SESSION_CLOSE) { + return true; + } + } + + return false; +} + TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout) : MaxQueueSize_(maxQueueSize) , MaxWaitSessionTimeout_(maxWaitSessionTimeout) diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h index 561f5e119f9..7d9f2fc2b5b 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h @@ -7,6 +7,7 @@ namespace NYdb { +class TOperation; namespace NSessionPool { class IGetSessionCtx : private TNonCopyable { @@ -18,10 +19,67 @@ public: virtual void ReplyNewSession() = 0; }; +//How often run session pool keep alive check +constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5); constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool +TStatus GetStatus(const TOperation& operation); +TStatus GetStatus(const TStatus& status); + +TDuration RandomizeThreshold(TDuration duration); +bool IsSessionCloseRequested(const TStatus& status); + +template<typename TResponse> +NThreading::TFuture<TResponse> InjectSessionStatusInterception( + std::shared_ptr<::NYdb::TKqpSessionCommon> impl, NThreading::TFuture<TResponse> asyncResponse, + bool updateTimeout, + TDuration timeout, + std::function<void(const TResponse&, TKqpSessionCommon&)> cb = {}) +{ + auto promise = NThreading::NewPromise<TResponse>(); + asyncResponse.Subscribe([impl, promise, cb, updateTimeout, timeout](NThreading::TFuture<TResponse> future) mutable { + Y_VERIFY(future.HasValue()); + + // TResponse can hold refcounted user provided data (TSession for example) + // and we do not want to have copy of it (for example it can cause delay dtor call) + // so using move semantic here is mandatory. + // Also we must reset captured shared pointer to session impl + TResponse value = std::move(future.ExtractValue()); + + const TStatus& status = GetStatus(value); + // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect + // since we have guarantee this request wasn't been started to execute. + + if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { + impl->MarkBroken(); + } else if (status.GetStatus() == EStatus::SESSION_BUSY) { + impl->MarkBroken(); + } else if (status.GetStatus() == EStatus::BAD_SESSION) { + impl->MarkBroken(); + } else if (IsSessionCloseRequested(status)) { + impl->MarkAsClosing(); + } else { + // NOTE: About GetState and lock + // Simultanious call multiple requests on the same session make no sence, due to server limitation. + // But user can perform this call, right now we do not protect session from this, it may cause + // raise on session state if respoise is not success. + // It should not be a problem - in case of this race we close session + // or put it in to settler. + if (updateTimeout && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { + impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TKqpSessionCommon::EState::S_ACTIVE); + } + } + if (cb) { + cb(value, *impl); + } + impl.reset(); + promise.SetValue(std::move(value)); + }); + return promise.GetFuture(); +} + class TSessionPool { private: class TWaitersQueue { diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/ya.make b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/ya.make index 9e3c6beae85..0bb2529a8da 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/ya.make +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/ya.make @@ -8,6 +8,7 @@ PEERDIR( library/cpp/threading/future ydb/public/api/protos ydb/public/sdk/cpp/client/impl/ydb_endpoints + ydb/public/sdk/cpp/client/ydb_types/operation ydb/library/yql/public/issue/protos ) diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt index 160fe391c82..914ec967116 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt +++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -2.5.5
\ No newline at end of file +2.6.0
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index 253e14bc425..2ca4d633572 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -1,7 +1,11 @@ #include "client.h" +#include "impl/client_session.h" #define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h> #undef INCLUDE_YDB_INTERNAL_H #include <ydb/public/lib/operation_id/operation_id.h> @@ -10,11 +14,17 @@ namespace NYdb::NQuery { -class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl> { +TCreateSessionSettings::TCreateSessionSettings() { + ClientTimeout_ = TDuration::Seconds(5); +}; + +class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public ISessionClient { + friend class ::NYdb::NQuery::TSession; public: TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings) : TClientImplCommon(std::move(connections), settings) , Settings_(settings) + , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_) { } @@ -23,15 +33,18 @@ public: } TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings) + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId = {}) { - return TExecQueryImpl::StreamExecuteQuery(Connections_, DbDriverState_, query, txControl, params, settings); + return TExecQueryImpl::StreamExecuteQuery( + Connections_, DbDriverState_, query, txControl, params, settings, sessionId); } TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings) + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, + const TString& sessionId = {}) { - return TExecQueryImpl::ExecuteQuery(Connections_, DbDriverState_, query, txControl, params, settings); + return TExecQueryImpl::ExecuteQuery( + Connections_, DbDriverState_, query, txControl, params, settings, sessionId); } NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, const TExecuteScriptSettings& settings) { @@ -127,13 +140,222 @@ public: return promise.GetFuture(); } + void DeleteSession(TKqpSessionCommon* sessionImpl) override { + //TODO: Remove this copy-paste + + // Closing not owned by session pool session should not fire getting new session + if (sessionImpl->IsOwnedBySessionPool()) { + if (SessionPool_.CheckAndFeedWaiterNewSession(sessionImpl->NeedUpdateActiveCounter())) { + // We requested new session for waiter which already incremented + // active session counter and old session will be deleted + // - skip update active counter in this case + sessionImpl->SetNeedUpdateActiveCounter(false); + } + } + + if (sessionImpl->NeedUpdateActiveCounter()) { + SessionPool_.DecrementActiveCounter(); + } + + delete sessionImpl; + } + + bool ReturnSession(TKqpSessionCommon* sessionImpl) override { + Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE || + sessionImpl->GetState() == TSession::TImpl::S_IDLE); + + //TODO: Remove this copy-paste from table client + bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter(); + // Also removes NeedUpdateActiveCounter flag + sessionImpl->MarkIdle(); + sessionImpl->SetTimeInterval(TDuration::Zero()); + if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { + sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); + return false; + } + return true; + } + + void DoAttachSession(Ydb::Query::CreateSessionResponse* resp, + NThreading::TPromise<TCreateSessionResult> promise, const TString& endpoint, + std::shared_ptr<TQueryClient::TImpl> client) + { + using TStreamProcessorPtr = TSession::TImpl::TStreamProcessorPtr; + Ydb::Query::AttachSessionRequest request; + const auto sessionId = resp->session_id(); + request.set_session_id(sessionId); + + const auto endpointKey = TEndpointKey(endpoint, GetNodeIdFromSession(sessionId)); + + auto args = std::make_shared<TSession::TImpl::TAttachSessionArgs>(promise, sessionId, endpoint, client); + + // Do not pass client timeout here. Session must be alive + static const TRpcRequestSettings rpcSettings; + + Connections_->StartReadStream< + Ydb::Query::V1::QueryService, + Ydb::Query::AttachSessionRequest, + Ydb::Query::SessionState> + ( + std::move(request), + [args] (TPlainStatus status, TStreamProcessorPtr processor) mutable { + if (processor) { + TSession::TImpl::MakeImplAsync(processor, args); + } else { + TStatus st(std::move(status)); + args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + } + }, + &Ydb::Query::V1::QueryService::Stub::AsyncAttachSession, + DbDriverState_, + rpcSettings, + endpointKey); + } + + TAsyncCreateSessionResult CreateAttachedSession(TDuration timeout) { + using namespace Ydb::Query; + + Ydb::Query::CreateSessionRequest request; + + auto promise = NThreading::NewPromise<TCreateSessionResult>(); + + auto self = shared_from_this(); + + auto extractor = [promise, self] (Ydb::Query::CreateSessionResponse* resp, TPlainStatus status) mutable { + if (resp) { + if (resp->status() != Ydb::StatusIds::SUCCESS) { + NYql::TIssues opIssues; + NYql::IssuesFromMessage(resp->issues(), opIssues); + TStatus st(static_cast<EStatus>(resp->status()), std::move(opIssues)); + promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + } else { + self->DoAttachSession(resp, promise, status.Endpoint, self); + } + } else { + TStatus st(std::move(status)); + promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + } + }; + + TRpcRequestSettings rpcSettings; + rpcSettings.ClientTimeout = timeout; + + Connections_->Run<V1::QueryService, CreateSessionRequest, CreateSessionResponse>( + std::move(request), + extractor, + &V1::QueryService::Stub::AsyncCreateSession, + DbDriverState_, + rpcSettings, + TEndpointKey()); + + return promise.GetFuture(); + } + + TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings) { + using namespace NSessionPool; + + class TQueryClientGetSessionCtx : public IGetSessionCtx { + public: + TQueryClientGetSessionCtx(std::shared_ptr<TQueryClient::TImpl> client, TDuration timeout) + : Promise(NThreading::NewPromise<TCreateSessionResult>()) + , Client(client) + , ClientTimeout(timeout) + {} + + TAsyncCreateSessionResult GetFuture() { + return Promise.GetFuture(); + } + + void ReplyError(TStatus status) override { + TSession session; + ScheduleReply(TCreateSessionResult(std::move(status), std::move(session))); + } + + void ReplySessionToUser(TKqpSessionCommon* session) override { + TCreateSessionResult val( + TStatus(TPlainStatus()), + TSession( + Client, + static_cast<TSession::TImpl*>(session) + ) + ); + + ScheduleReply(std::move(val)); + } + + void ReplyNewSession() override { + Client->CreateAttachedSession(ClientTimeout).Subscribe( + [promise{std::move(Promise)}](TAsyncCreateSessionResult future) mutable + { + promise.SetValue(future.ExtractValue()); + }); + } + + private: + void ScheduleReply(TCreateSessionResult val) { + Promise.SetValue(std::move(val)); + } + NThreading::TPromise<TCreateSessionResult> Promise; + std::shared_ptr<TQueryClient::TImpl> Client; + TDuration ClientTimeout; + }; + + auto ctx = std::make_unique<TQueryClientGetSessionCtx>(shared_from_this(), settings.ClientTimeout_); + auto future = ctx->GetFuture(); + SessionPool_.GetSession(std::move(ctx)); + return future; + } + + i64 GetActiveSessionCount() const { + return SessionPool_.GetActiveSessions(); + } + + i64 GetActiveSessionsLimit() const { + return SessionPool_.GetActiveSessionsLimit(); + } + + i64 GetCurrentPoolSize() const { + return SessionPool_.GetCurrentPoolSize(); + } + + void StartPeriodicSessionPoolTask() { + // Session pool guarantees than client is alive during call callbacks + auto deletePredicate = [this](TKqpSessionCommon* s, size_t sessionsCount) { + + const auto& sessionPoolSettings = Settings_.SessionPoolSettings_; + const auto spentTime = s->GetTimeToTouchFast() - s->GetTimeInPastFast(); + + if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) { + if (sessionsCount > sessionPoolSettings.MinPoolSize_) { + return true; + } + } + + return false; + }; + + // No need to keep-alive + auto keepAliveCmd = [](TKqpSessionCommon*) { + }; + + std::weak_ptr<TQueryClient::TImpl> weak = shared_from_this(); + Connections_->AddPeriodicTask( + SessionPool_.CreatePeriodicTask( + weak, + std::move(keepAliveCmd), + std::move(deletePredicate) + ), NSessionPool::PERIODIC_ACTION_INTERVAL); + } + private: TClientSettings Settings_; + NSessionPool::TSessionPool SessionPool_; }; TQueryClient::TQueryClient(const TDriver& driver, const TClientSettings& settings) : Impl_(new TQueryClient::TImpl(CreateInternalInterface(driver), settings)) { + Impl_->StartPeriodicSessionPoolTask(); } TAsyncExecuteQueryResult TQueryClient::ExecuteQuery(const TString& query, const TTxControl& txControl, @@ -173,4 +395,87 @@ TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const NKikimr::N return Impl_->FetchScriptResults(operationId, resultSetIndex, settings); } +TAsyncCreateSessionResult TQueryClient::GetSession(const TCreateSessionSettings& settings) +{ + return Impl_->GetSession(settings); +} + +i64 TQueryClient::GetActiveSessionCount() const { + return Impl_->GetActiveSessionCount(); +} + +i64 TQueryClient::GetActiveSessionsLimit() const { + return Impl_->GetActiveSessionsLimit(); +} + +i64 TQueryClient::GetCurrentPoolSize() const { + return Impl_->GetCurrentPoolSize(); +} + +//////////////////////////////////////////////////////////////////////////////// + +TCreateSessionResult::TCreateSessionResult(TStatus&& status, TSession&& session) + : TStatus(std::move(status)) + , Session_(std::move(session)) +{} + +TSession TCreateSessionResult::GetSession() const { + CheckStatusOk("TCreateSessionResult::GetSession"); + return Session_; +} + +//////////////////////////////////////////////////////////////////////////////// + +TSession::TSession() +{} + +TSession::TSession(std::shared_ptr<TQueryClient::TImpl> client, TSession::TImpl* session) + : Client_(client) + , SessionImpl_(session, TKqpSessionCommon::GetSmartDeleter(client)) +{} + +const TString& TSession::GetId() const { + return SessionImpl_->GetId(); +} + +TAsyncExecuteQueryResult TSession::ExecuteQuery(const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings) +{ + return NSessionPool::InjectSessionStatusInterception( + SessionImpl_, + Client_->ExecuteQuery(query, txControl, {}, settings, SessionImpl_->GetId()), + true, + Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_); +} + +TAsyncExecuteQueryResult TSession::ExecuteQuery(const TString& query, const TTxControl& txControl, + const TParams& params, const TExecuteQuerySettings& settings) +{ + return NSessionPool::InjectSessionStatusInterception( + SessionImpl_, + Client_->ExecuteQuery(query, txControl, params, settings, SessionImpl_->GetId()), + true, + Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_); +} + +TAsyncExecuteQueryIterator TSession::StreamExecuteQuery(const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings) +{ + return NSessionPool::InjectSessionStatusInterception( + SessionImpl_, + Client_->StreamExecuteQuery(query, txControl, {}, settings, SessionImpl_->GetId()), + true, + Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_); +} + +TAsyncExecuteQueryIterator TSession::StreamExecuteQuery(const TString& query, const TTxControl& txControl, + const TParams& params, const TExecuteQuerySettings& settings) +{ + return NSessionPool::InjectSessionStatusInterception( + SessionImpl_, + Client_->StreamExecuteQuery(query, txControl, params, settings, SessionImpl_->GetId()), + true, + Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_); +} + } // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.h b/ydb/public/sdk/cpp/client/ydb_query/client.h index 7e823825e27..88e73e6689a 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.h +++ b/ydb/public/sdk/cpp/client/ydb_query/client.h @@ -15,15 +15,43 @@ namespace NYdb { namespace NYdb::NQuery { +struct TCreateSessionSettings : public TSimpleRequestSettings<TCreateSessionSettings> { + TCreateSessionSettings(); +}; + +class TCreateSessionResult; +using TAsyncCreateSessionResult = NThreading::TFuture<TCreateSessionResult>; + +struct TSessionPoolSettings { + using TSelf = TSessionPoolSettings; + + // Max number of sessions client can get from session pool + FLUENT_SETTING_DEFAULT(ui32, MaxActiveSessions, 50); + + // Max time session to be in idle state before closing + FLUENT_SETTING_DEFAULT(TDuration, CloseIdleThreshold, TDuration::Minutes(1)); + + // Min number of session in session pool. + // Sessions will not be closed by CloseIdleThreshold if the number of sessions less then this limit. + FLUENT_SETTING_DEFAULT(ui32, MinPoolSize, 10); +}; + struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> { + using TSessionPoolSettings = TSessionPoolSettings; using TSelf = TClientSettings; + FLUENT_SETTING(TSessionPoolSettings, SessionPoolSettings); }; // ! WARNING: Experimental API // ! This API is currently in experimental state and is a subject for changes. // ! No backward and/or forward compatibility guarantees are provided. // ! DO NOT USE for production workloads. +class TSession; class TQueryClient { + friend class TSession; +public: + using TSettings = TClientSettings; + using TSession = TSession; public: TQueryClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); @@ -45,9 +73,57 @@ public: TAsyncFetchScriptResultsResult FetchScriptResults(const NKikimr::NOperationId::TOperationId& operationId, int64_t resultSetIndex, const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings()); + TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings = TCreateSessionSettings()); + + //! Returns number of active sessions given via session pool + i64 GetActiveSessionCount() const; + + //! Returns the maximum number of sessions in session pool + i64 GetActiveSessionsLimit() const; + + //! Returns the size of session pool + i64 GetCurrentPoolSize() const; + private: class TImpl; std::shared_ptr<TImpl> Impl_; }; +class TSession { + friend class TQueryClient; +public: + const TString& GetId() const; + + TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings = TExecuteQuerySettings()); + + TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl, + const TParams& params, const TExecuteQuerySettings& settings = TExecuteQuerySettings()); + + TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings = TExecuteQuerySettings()); + + TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl, + const TParams& params, const TExecuteQuerySettings& settings = TExecuteQuerySettings()); + + + class TImpl; +private: + TSession(); + TSession(std::shared_ptr<TQueryClient::TImpl> client, TSession::TImpl* sessionImpl); + + std::shared_ptr<TQueryClient::TImpl> Client_; + std::shared_ptr<TSession::TImpl> SessionImpl_; +}; + +class TCreateSessionResult: public TStatus { + friend class TSession::TImpl; +public: + TCreateSessionResult(TStatus&& status, TSession&& session); + TSession GetSession() const; + +private: + TSession Session_; +}; + } // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.darwin-x86_64.txt index 89fba989351..5636e882602 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.darwin-x86_64.txt @@ -18,4 +18,5 @@ target_link_libraries(client-ydb_query-impl PUBLIC ) target_sources(client-ydb_query-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-aarch64.txt index 89e9a242f2d..09ad124d137 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-aarch64.txt @@ -19,4 +19,5 @@ target_link_libraries(client-ydb_query-impl PUBLIC ) target_sources(client-ydb_query-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-x86_64.txt index 89e9a242f2d..09ad124d137 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-x86_64.txt @@ -19,4 +19,5 @@ target_link_libraries(client-ydb_query-impl PUBLIC ) target_sources(client-ydb_query-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.windows-x86_64.txt index 89fba989351..5636e882602 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.windows-x86_64.txt @@ -18,4 +18,5 @@ target_link_libraries(client-ydb_query-impl PUBLIC ) target_sources(client-ydb_query-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp new file mode 100644 index 00000000000..daa45e45efb --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp @@ -0,0 +1,62 @@ +#include "client_session.h" + +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <ydb/library/yql/public/issue/yql_issue_message.h> + +namespace NYdb::NQuery { + +TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const TString& sessionId, const TString& endpoint) + : TKqpSessionCommon(sessionId, endpoint, true) + , StreamProcessor_(ptr) +{ + MarkActive(); + SetNeedUpdateActiveCounter(true); +} + +TSession::TImpl::~TImpl() +{ + StreamProcessor_->Cancel(); +} + +void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, + std::shared_ptr<TAttachSessionArgs> args) +{ + auto resp = std::make_shared<Ydb::Query::SessionState>(); + ptr->Read(resp.get(), [args, resp, ptr](NGrpc::TGrpcStatus grpcStatus) mutable { + if (grpcStatus.GRpcStatusCode != grpc::StatusCode::OK) { + TStatus st(TPlainStatus(grpcStatus, args->Endpoint)); + args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + + } else { + if (resp->status() == Ydb::StatusIds::SUCCESS) { + NYdb::TStatus st(TPlainStatus(grpcStatus, args->Endpoint)); + TSession::TImpl::NewSmartShared(ptr, std::move(args), st); + + } else { + NYql::TIssues opIssues; + NYql::IssuesFromMessage(resp->issues(), opIssues); + TStatus st(static_cast<EStatus>(resp->status()), std::move(opIssues)); + args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + } + } + }); +} + +void TSession::TImpl::NewSmartShared(TStreamProcessorPtr ptr, + std::shared_ptr<TAttachSessionArgs> args, NYdb::TStatus st) +{ + args->Promise.SetValue( + TCreateSessionResult( + std::move(st), + TSession( + args->Client, + new TSession::TImpl(ptr, args->SessionId, args->Endpoint) + ) + ) + ); +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h new file mode 100644 index 00000000000..fd9d224291e --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h @@ -0,0 +1,40 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h> + +namespace NYdb::NQuery { + +class TSession::TImpl : public TKqpSessionCommon { +public: + struct TAttachSessionArgs { + TAttachSessionArgs(NThreading::TPromise<TCreateSessionResult> promise, + TString sessionId, + TString endpoint, + std::shared_ptr<TQueryClient::TImpl> client) + : Promise(promise) + , SessionId(sessionId) + , Endpoint(endpoint) + , Client(client) + { } + NThreading::TPromise<TCreateSessionResult> Promise; + TString SessionId; + TString Endpoint; + std::shared_ptr<TQueryClient::TImpl> Client; + }; + + using TResponse = Ydb::Query::SessionState; + using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr; + TImpl(TStreamProcessorPtr ptr, const TString& id, const TString& endpoint); + ~TImpl(); + + static void MakeImplAsync(TStreamProcessorPtr processor, std::shared_ptr<TAttachSessionArgs> args); + +private: + static void NewSmartShared(TStreamProcessorPtr ptr, std::shared_ptr<TAttachSessionArgs> args, NYdb::TStatus status); + +private: + TStreamProcessorPtr StreamProcessor_; +}; + +} diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp index a511c0ef42a..6cf63446187 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp @@ -182,13 +182,16 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryImpl( const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, const ::google::protobuf::Map<TString, Ydb::TypedValue>* params, - const TExecuteQuerySettings& settings) + const TExecuteQuerySettings& settings, const TString& sessionId) { auto request = MakeRequest<Ydb::Query::ExecuteQueryRequest>(); request.set_exec_mode(::Ydb::Query::ExecMode(settings.ExecMode_)); request.set_stats_mode(::Ydb::Query::StatsMode(settings.StatsMode_)); request.mutable_query_content()->set_text(query); request.mutable_query_content()->set_syntax(::Ydb::Query::Syntax(settings.Syntax_)); + if (sessionId) { + request.set_session_id(sessionId); + } if (settings.ConcurrentResultSets_) { request.set_concurrent_result_sets(*settings.ConcurrentResultSets_); @@ -232,7 +235,7 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings) + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId) { auto promise = NewPromise<TExecuteQueryIterator>(); @@ -251,19 +254,19 @@ TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ ? ¶ms->GetProtoMap() : nullptr; - StreamExecuteQueryImpl(connections, driverState, query, txControl, paramsProto, settings) + StreamExecuteQueryImpl(connections, driverState, query, txControl, paramsProto, settings, sessionId) .Subscribe(iteratorCallback); return promise.GetFuture(); } TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings) + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId) { auto syncSettings = settings; syncSettings.ConcurrentResultSets(true); - return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings) + return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings, sessionId) .Apply([](TAsyncExecuteQueryIterator itFuture){ auto it = itFuture.ExtractValue(); diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h index b7fd5b08728..24765225a70 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h @@ -13,11 +13,11 @@ class TExecQueryImpl { public: static TAsyncExecuteQueryIterator StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings); + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId); static TAsyncExecuteQueryResult ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings); + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId); }; } // namespace NYdb::NQuery::NImpl diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_query/impl/ya.make index 969bcdd7745..23bae5bf53f 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( exec_query.cpp exec_query.h + client_session.cpp ) PEERDIR( diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp index d6a122bfec0..478f927f6a6 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -8,28 +8,6 @@ using namespace NThreading; const TKeepAliveSettings TTableClient::TImpl::KeepAliveSettings = TKeepAliveSettings().ClientTimeout(KEEP_ALIVE_CLIENT_TIMEOUT); -bool IsSessionCloseRequested(const TStatus& status) { - const auto& meta = status.GetResponseMetadata(); - auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS); - for(auto it = hints.first; it != hints.second; ++it) { - if (it->second == NYdb::YDB_SESSION_CLOSE) { - return true; - } - } - - return false; -} - -TDuration RandomizeThreshold(TDuration duration) { - TDuration::TValue value = duration.GetValue(); - if (KEEP_ALIVE_RANDOM_FRACTION) { - const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION; - if (randomLimit < 2) - return duration; - value += static_cast<i64>(RandomNumber<ui64>(randomLimit)); - } - return TDuration::FromValue(value); -} TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings) { return Min(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_); @@ -39,14 +17,6 @@ TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings) { return Max(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_); } -TStatus GetStatus(const TOperation& operation) { - return operation.Status(); -} - -TStatus GetStatus(const TStatus& status) { - return status; -} - ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber) { ui32 backoffSlots = 1 << std::min(retryNumber, settings.Ceiling_); TDuration maxDuration = settings.SlotDuration_ * backoffSlots; @@ -143,7 +113,6 @@ void TTableClient::TImpl::AsyncBackoff(const TBackoffSettings& settings, ui32 re } void TTableClient::TImpl::StartPeriodicSessionPoolTask() { - // Session pool guarantees than client is alive during call callbacks auto deletePredicate = [this](TKqpSessionCommon* s, size_t sessionsCount) { @@ -205,7 +174,7 @@ void TTableClient::TImpl::StartPeriodicSessionPoolTask() { auto timeToNextTouch = calcTimeToNextTouch(spentTime); session.SessionImpl_->ScheduleTimeToTouchFast( - RandomizeThreshold(timeToNextTouch), + NSessionPool::RandomizeThreshold(timeToNextTouch), spentTime >= maxTimeToTouch ); }; @@ -216,7 +185,7 @@ void TTableClient::TImpl::StartPeriodicSessionPoolTask() { weak, std::move(keepAliveCmd), std::move(deletePredicate) - ), PERIODIC_ACTION_INTERVAL); + ), NSessionPool::PERIODIC_ACTION_INTERVAL); } ui64 TTableClient::TImpl::ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client) { diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h index 3b73e782822..875c740a6b8 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h @@ -13,8 +13,6 @@ #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> -#include <util/random/random.h> - #include "client_session.h" #include "data_query.h" #include "request_migrator.h" @@ -24,73 +22,16 @@ namespace NYdb { namespace NTable { -//How often run session pool keep alive check -constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5); //How ofter run host scan to perform session balancing constexpr TDuration HOSTSCAN_PERIODIC_ACTION_INTERVAL = TDuration::Seconds(2); -constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4; constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds(); constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5); TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings); TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings); ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber); -bool IsSessionCloseRequested(const TStatus& status); -TDuration RandomizeThreshold(TDuration duration); TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings); TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings); -TStatus GetStatus(const TOperation& operation); -TStatus GetStatus(const TStatus& status); - -template<typename TResponse> -NThreading::TFuture<TResponse> InjectSessionStatusInterception( - std::shared_ptr<TSession::TImpl>& impl, NThreading::TFuture<TResponse> asyncResponse, - bool updateTimeout, - TDuration timeout, - std::function<void(const TResponse&, TSession::TImpl&)> cb = {}) -{ - auto promise = NThreading::NewPromise<TResponse>(); - asyncResponse.Subscribe([impl, promise, cb, updateTimeout, timeout](NThreading::TFuture<TResponse> future) mutable { - Y_VERIFY(future.HasValue()); - - // TResponse can hold refcounted user provided data (TSession for example) - // and we do not want to have copy of it (for example it can cause delay dtor call) - // so using move semantic here is mandatory. - // Also we must reset captured shared pointer to session impl - TResponse value = std::move(future.ExtractValue()); - - const TStatus& status = GetStatus(value); - // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect - // since we have guarantee this request wasn't been started to execute. - - if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { - impl->MarkBroken(); - } else if (status.GetStatus() == EStatus::SESSION_BUSY) { - impl->MarkBroken(); - } else if (status.GetStatus() == EStatus::BAD_SESSION) { - impl->MarkBroken(); - } else if (IsSessionCloseRequested(status)) { - impl->MarkAsClosing(); - } else { - // NOTE: About GetState and lock - // Simultanious call multiple requests on the same session make no sence, due to server limitation. - // But user can perform this call, right now we do not protect session from this, it may cause - // raise on session state if respoise is not success. - // It should not be a problem - in case of this race we close session - // or put it in to settler. - if (updateTimeout && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { - impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TSession::TImpl::EState::S_ACTIVE); - } - } - if (cb) { - cb(value, *impl); - } - impl.reset(); - promise.SetValue(std::move(value)); - }); - return promise.GetFuture(); -} - class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public ISessionClient { public: @@ -105,7 +46,7 @@ public: NThreading::TFuture<void> Drain(); NThreading::TFuture<void> Stop(); void ScheduleTask(const std::function<void()>& fn, TDuration timeout); - void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) override; + void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout); void AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn); void StartPeriodicSessionPoolTask(); static ui64 ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client); @@ -143,7 +84,7 @@ public: CacheMissCounter.Inc(); - return InjectSessionStatusInterception(session.SessionImpl_, + return ::NYdb::NSessionPool::InjectSessionStatusInterception(session.SessionImpl_, ExecuteDataQueryInternal(session, query, txControl, params, settings, false), true, GetMinTimeToTouch(Settings_.SessionPoolSettings_)); } @@ -153,13 +94,13 @@ public: TParamsType params, const TExecDataQuerySettings& settings, bool fromCache) { TString queryKey = dataQuery.Impl_->GetTextHash(); - auto cb = [queryKey](const TDataQueryResult& result, TSession::TImpl& session) { + auto cb = [queryKey](const TDataQueryResult& result, TKqpSessionCommon& session) { if (result.GetStatus() == EStatus::NOT_FOUND) { - session.InvalidateQueryInCache(queryKey); + static_cast<TSession::TImpl&>(session).InvalidateQueryInCache(queryKey); } }; - return InjectSessionStatusInterception<TDataQueryResult>( + return ::NYdb::NSessionPool::InjectSessionStatusInterception<TDataQueryResult>( session.SessionImpl_, session.Client_->ExecuteDataQueryInternal(session, dataQuery, txControl, params, settings, fromCache), true, diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index de3e4ebed32..035ef07168c 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -31,6 +31,7 @@ namespace NYdb { namespace NTable { using namespace NThreading; +using namespace NSessionPool; //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index 38765041e6d..340c99b9767 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -930,6 +930,7 @@ struct TSessionPoolSettings { struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> { using TSelf = TClientSettings; + using TSessionPoolSettings = TSessionPoolSettings; // Enable client query cache. Client query cache is used to map query text to // prepared query id for ExecuteDataQuery calls on client side. @@ -998,6 +999,8 @@ public: using TOperationSyncFunc = std::function<TStatus(TSession session)>; using TOperationWithoutSessionFunc = std::function<TAsyncStatus(TTableClient& tableClient)>; using TOperationWithoutSessionSyncFunc = std::function<TStatus(TTableClient& tableClient)>; + using TSettings = TClientSettings; + using TSession = TSession; public: TTableClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); diff --git a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp index d4b82e3ab55..1ad11c74ea3 100644 --- a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp +++ b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp @@ -1,5 +1,6 @@ #include "ydb_common_ut.h" #include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> #include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> @@ -204,7 +205,20 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) { driver.Stop(true); } - Y_UNIT_TEST(MultiThreadSessionPoolLimitSync) { + void EnsureCanExecQuery(NYdb::NTable::TSession session) { + auto execStatus = session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus(); + UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS); + } + + void EnsureCanExecQuery(NYdb::NQuery::TSession session) { + auto execStatus = session.ExecuteQuery("SELECT 42;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync().GetStatus(); + UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS); + } + + template<typename TClient> + void DoMultiThreadSessionPoolLimitSync() { TKikimrWithGrpcAndRootSchema server; ui16 grpc = server.GetPort(); @@ -215,14 +229,14 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) { .SetEndpoint(location)); const int maxActiveSessions = 45; - NYdb::NTable::TTableClient client(driver, - TClientSettings() + TClient client(driver, + typename TClient::TSettings() .SessionPoolSettings( - TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); + typename TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); constexpr int nThreads = 100; NYdb::EStatus statuses[nThreads]; - TVector<TMaybe<NYdb::NTable::TSession>> sessions; + TVector<TMaybe<typename TClient::TSession>> sessions; sessions.resize(nThreads); TAtomic t = 0; auto job = [client, &t, &statuses, &sessions]() mutable { @@ -230,9 +244,7 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) { int i = AtomicIncrement(t); statuses[--i] = sessionResponse.GetStatus(); if (statuses[i] == EStatus::SUCCESS) { - auto execStatus = sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus(); - UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS); + EnsureCanExecQuery(sessionResponse.GetSession()); sessions[i] = sessionResponse.GetSession(); } }; @@ -270,7 +282,26 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) { driver.Stop(true); } - Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessions) { + Y_UNIT_TEST(MultiThreadSessionPoolLimitSyncTableClient) { + DoMultiThreadSessionPoolLimitSync<NYdb::NTable::TTableClient>(); + } + + Y_UNIT_TEST(MultiThreadSessionPoolLimitSyncQueryClient) { + DoMultiThreadSessionPoolLimitSync<NYdb::NQuery::TQueryClient>(); + } + + NYdb::NTable::TAsyncDataQueryResult ExecQueryAsync(NYdb::NTable::TSession session, const TString q) { + return session.ExecuteDataQuery(q, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()); + } + + NYdb::NQuery::TAsyncExecuteQueryResult ExecQueryAsync(NYdb::NQuery::TSession session, const TString q) { + return session.ExecuteQuery(q, + NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + } + + template<typename T> + void DoMultiThreadMultipleRequestsOnSharedSessions() { TKikimrWithGrpcAndRootSchema server; ui16 grpc = server.GetPort(); @@ -281,21 +312,21 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) { .SetEndpoint(location)); const int maxActiveSessions = 10; - NYdb::NTable::TTableClient client(driver, - TClientSettings() + typename T::TClient client(driver, + typename T::TClient::TSettings() .SessionPoolSettings( - TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); + typename T::TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); constexpr int nThreads = 20; constexpr int nRequests = 50; - std::array<TVector<TAsyncDataQueryResult>, nThreads> results; + std::array<TVector<typename T::TResult>, nThreads> results; TAtomic t = 0; TAtomic validSessions = 0; auto job = [client, &t, &results, &validSessions]() mutable { auto sessionResponse = client.GetSession().ExtractValueSync(); int i = AtomicIncrement(t); - TVector<TAsyncDataQueryResult>& r = results[--i]; + TVector<typename T::TResult>& r = results[--i]; if (sessionResponse.GetStatus() != EStatus::SUCCESS) { return; @@ -303,8 +334,7 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) { AtomicIncrement(validSessions); for (int i = 0; i < nRequests; i++) { - r.push_back(sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + r.push_back(ExecQueryAsync(sessionResponse.GetSession(), "SELECT 42;")); } }; IThreadFactory* pool = SystemThreadFactory(); @@ -354,6 +384,23 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) { driver.Stop(true); } + Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessionsTableClient) { + struct TTypeHelper { + using TClient = NYdb::NTable::TTableClient; + using TResult = NYdb::NTable::TAsyncDataQueryResult; + }; + DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>(); + } + + // Enable after interactive tx support + //Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessionsQueryClient) { + // struct TTypeHelper { + // using TClient = NYdb::NQuery::TQueryClient; + // using TResult = NYdb::NQuery::TAsyncExecuteQueryResult; + // }; + // DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>(); + //} + Y_UNIT_TEST(SessionsServerLimit) { NKikimrConfig::TAppConfig appConfig; auto& tableServiceConfig = *appConfig.MutableTableServiceConfig(); |