diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-07-24 13:17:59 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-07-24 13:17:59 +0300 |
commit | d335551d092a04a48aa7ca00f95e4bc5cb066816 (patch) | |
tree | 548d3dde259d6eeb3a1eea465b8a4164ac344a38 | |
parent | 8e9b187a00d40b1cd9ef46953fbff821c7f4869e (diff) | |
download | ydb-d335551d092a04a48aa7ca00f95e4bc5cb066816.tar.gz |
Refactoring. Move out client related functions from session pool. KIKIMR-18788
6 files changed, 130 insertions, 125 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp index a4cf92fd61..6a083f4750 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp @@ -73,11 +73,6 @@ void TKqpSessionCommon::MarkAsClosing() { } } -void TKqpSessionCommon::MarkStandalone() { - State_ = EState::S_STANDALONE; - NeedUpdateActiveCounter_ = false; -} - void TKqpSessionCommon::MarkActive() { State_ = EState::S_ACTIVE; NeedUpdateActiveCounter_ = false; diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h index 58d59b56ca..a932f7e54e 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h @@ -34,7 +34,6 @@ public: const TEndpointKey& GetEndpointKey() const; void MarkBroken(); void MarkAsClosing(); - void MarkStandalone(); void MarkActive(); void MarkIdle(); bool IsOwnedBySessionPool() const; diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt index 21b159dc8b..a4db534a2d 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt +++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -2.5.2
\ No newline at end of file +2.5.3
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp index 53e98fde73..980951bb19 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp @@ -7,6 +7,12 @@ namespace NTable { using namespace NThreading; +static const TStatus CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT = TStatus( + TPlainStatus( + EStatus::CLIENT_RESOURCE_EXHAUSTED, + "Active sessions limit exceeded" + ) + ); TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout) : MaxQueueSize_(maxQueueSize) @@ -14,25 +20,25 @@ TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitS { } -bool TSessionPool::TWaitersQueue::TryPush(NThreading::TPromise<TCreateSessionResult>& p) { +bool TSessionPool::TWaitersQueue::TryPush(std::unique_ptr<IGetSessionCtx>& p) { if (Waiters_.size() < MaxQueueSize_) { - Waiters_.insert(std::make_pair(TInstant::Now(), p)); + Waiters_.insert(std::make_pair(TInstant::Now(), std::move(p))); return true; } return false; } -TMaybe<NThreading::TPromise<TCreateSessionResult>> TSessionPool::TWaitersQueue::TryGet() { +std::unique_ptr<IGetSessionCtx> TSessionPool::TWaitersQueue::TryGet() { if (Waiters_.empty()) { return {}; } auto it = Waiters_.begin(); - auto result = it->second; + auto result = std::move(it->second); Waiters_.erase(it); return result; } -void TSessionPool::TWaitersQueue::GetOld(TInstant now, TVector<NThreading::TPromise<TCreateSessionResult>>& oldWaiters) { +void TSessionPool::TWaitersQueue::GetOld(TInstant now, TVector<std::unique_ptr<IGetSessionCtx>>& oldWaiters) { auto it = Waiters_.begin(); while (it != Waiters_.end()) { if (now < it->first + MaxWaitSessionTimeout_) @@ -65,63 +71,20 @@ void TTableClient::TImpl::CloseAndDeleteSession(std::unique_ptr<TSession::TImpl> deleteSession->MarkBroken(); } -void TSessionPool::CreateFakeSession( - NThreading::TPromise<TCreateSessionResult>& promise, - std::shared_ptr<TTableClient::TImpl> client) -{ - TSession session(client, "", "", false); - // Mark standalone to prevent returning to session pool - session.SessionImpl_->MarkStandalone(); - TCreateSessionResult val( - TStatus( - TPlainStatus( - EStatus::CLIENT_RESOURCE_EXHAUSTED, - "Active sessions limit exceeded" - ) - ), - std::move(session) - ); - - client->ScheduleTaskUnsafe([promise, val{std::move(val)}]() mutable { - promise.SetValue(std::move(val)); - }, TDuration()); -} - -void TSessionPool::MakeSessionPromiseFromSession( +void TSessionPool::ReplySessionToUser( TKqpSessionCommon* session, - NThreading::TPromise<TCreateSessionResult>& promise, - std::shared_ptr<TTableClient::TImpl> client -) { + std::unique_ptr<IGetSessionCtx> ctx) +{ Y_VERIFY(session->GetState() == TSession::TImpl::S_IDLE); Y_VERIFY(!session->GetTimeInterval()); session->MarkActive(); session->SetNeedUpdateActiveCounter(true); - - TCreateSessionResult val( - TStatus(TPlainStatus()), - TSession( - client, - std::shared_ptr<TSession::TImpl>( - static_cast<TSession::TImpl*>(session), - TSession::TImpl::GetSmartDeleter(client) - ) - ) - ); - - client->ScheduleTaskUnsafe( - [promise, val{std::move(val)}]() mutable { - promise.SetValue(std::move(val)); - }, - TDuration() - ); + ctx->ReplySessionToUser(session); } -TAsyncCreateSessionResult TSessionPool::GetSession( - std::shared_ptr<TTableClient::TImpl> client, - const TCreateSessionSettings& settings) +void TSessionPool::GetSession(std::unique_ptr<IGetSessionCtx> ctx) { - auto createSessionPromise = NewPromise<TCreateSessionResult>(); - std::unique_ptr<TSession::TImpl> sessionImpl; + std::unique_ptr<TKqpSessionCommon> sessionImpl; enum class TSessionSource { Pool, Waiter, @@ -133,7 +96,7 @@ TAsyncCreateSessionResult TSessionPool::GetSession( if (MaxActiveSessions_ == 0 || ActiveSessions_ < MaxActiveSessions_) { IncrementActiveCounterUnsafe(); - } else if (WaitersQueue_.TryPush(createSessionPromise)) { + } else if (WaitersQueue_.TryPush(ctx)) { sessionSource = TSessionSource::Waiter; } else { sessionSource = TSessionSource::Error; @@ -143,33 +106,31 @@ TAsyncCreateSessionResult TSessionPool::GetSession( sessionImpl = std::move(it->second); Sessions_.erase(it); } - + UpdateStats(); } + if (sessionSource == TSessionSource::Waiter) { // Nothing to do here } else if (sessionSource == TSessionSource::Error) { FakeSessionsCounter_.Inc(); - CreateFakeSession(createSessionPromise, client); + ctx->ReplyError(CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT); } else if (sessionImpl) { - MakeSessionPromiseFromSession(sessionImpl.release(), createSessionPromise, client); + ReplySessionToUser(sessionImpl.release(), std::move(ctx)); } else { - const auto& sessionResult = client->CreateSession(settings, false); - sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true)); + ctx->ReplyNewSession(); } - - return createSessionPromise.GetFuture(); } -bool TSessionPool::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client, bool active) { - NThreading::TPromise<TCreateSessionResult> createSessionPromise; +bool TSessionPool::CheckAndFeedWaiterNewSession(bool active) { + std::unique_ptr<IGetSessionCtx> getSessionCtx; { std::lock_guard guard(Mtx_); if (Closed_) return false; - if (auto prom = WaitersQueue_.TryGet()) { - createSessionPromise = std::move(*prom); + if (auto maybeCtx = WaitersQueue_.TryGet()) { + getSessionCtx = std::move(maybeCtx); } else { return false; } @@ -186,32 +147,24 @@ bool TSessionPool::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TI // The above mentioned conditions is quite rare so // we can just return an error. In this case uplevel code should // start retry. - CreateFakeSession(createSessionPromise, client); + getSessionCtx->ReplyError(CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT); return true; } - // This code can be called from client dtors. It may be unsafe to - // execute grpc call directly... - client->ScheduleTaskUnsafe([createSessionPromise, client]() mutable { - TCreateSessionSettings settings; - settings.ClientTimeout(CREATE_SESSION_INTERNAL_TIMEOUT); - - const auto& sessionResult = client->CreateSession(settings, false); - sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true)); - }, TDuration()); + getSessionCtx->ReplyNewSession(); return true; } -bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) { - // Do not set promise under the session pool lock - NThreading::TPromise<TCreateSessionResult> createSessionPromise; +bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active) { + // Do not call ReplySessionToUser under the session pool lock + std::unique_ptr<IGetSessionCtx> getSessionCtx; { std::lock_guard guard(Mtx_); if (Closed_) return false; - if (auto prom = WaitersQueue_.TryGet()) { - createSessionPromise = std::move(*prom); + if (auto maybeCtx = WaitersQueue_.TryGet()) { + getSessionCtx = std::move(maybeCtx); if (!active) IncrementActiveCounterUnsafe(); } else { @@ -228,8 +181,8 @@ bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active, std::shar UpdateStats(); } - if (createSessionPromise.Initialized()) { - MakeSessionPromiseFromSession(impl, createSessionPromise, client); + if (getSessionCtx) { + ReplySessionToUser(impl, std::move(getSessionCtx)); } return true; @@ -278,7 +231,7 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> sessionsToTouch.reserve(keepAliveBatchSize); TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete; sessionsToDelete.reserve(keepAliveBatchSize); - TVector<NThreading::TPromise<TCreateSessionResult>> waitersToReplyError; + TVector<std::unique_ptr<IGetSessionCtx>> waitersToReplyError; waitersToReplyError.reserve(keepAliveBatchSize); const auto now = TInstant::Now(); { @@ -324,7 +277,7 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> for (auto& waiter : waitersToReplyError) { FakeSessionsCounter_.Inc(); - CreateFakeSession(waiter, strongClient); + waiter->ReplyError(CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT); } } diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h index 702fe185f7..a37df781b6 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h @@ -8,50 +8,62 @@ namespace NYdb { + +namespace NSessionPool { + +class IGetSessionCtx : private TNonCopyable { +public: + virtual ~IGetSessionCtx() = default; + // Transfer ownership to ctx + virtual void ReplySessionToUser(TKqpSessionCommon* session) = 0; + virtual void ReplyError(TStatus status) = 0; + virtual void ReplyNewSession() = 0; +}; + +} + namespace NTable { using namespace NThreading; - +using namespace NSessionPool; constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool - class TSessionPool { - typedef TAsyncCreateSessionResult - (*TAwareSessonProvider) - (std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings); private: class TWaitersQueue { public: TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout=MAX_WAIT_SESSION_TIMEOUT); - bool TryPush(NThreading::TPromise<TCreateSessionResult>& p); - TMaybe<NThreading::TPromise<TCreateSessionResult>> TryGet(); - void GetOld(TInstant now, TVector<NThreading::TPromise<TCreateSessionResult>>& oldWaiters); + // returns true and gets ownership if queue size less than limit + // otherwise returns false and doesn't not touch ctx + bool TryPush(std::unique_ptr<IGetSessionCtx>& p); + std::unique_ptr<IGetSessionCtx> TryGet(); + void GetOld(TInstant now, TVector<std::unique_ptr<IGetSessionCtx>>& oldWaiters); ui32 Size() const; private: const ui32 MaxQueueSize_; const TDuration MaxWaitSessionTimeout_; - TMultiMap<TInstant, NThreading::TPromise<TCreateSessionResult>> Waiters_; + TMultiMap<TInstant, std::unique_ptr<IGetSessionCtx>> Waiters_; }; public: using TKeepAliveCmd = std::function<void(TSession session)>; using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>; TSessionPool(ui32 maxActiveSessions); - // TAwareSessonProvider: - // function is called if session pool is empty, - // this is used for additional total session count limitation - TAsyncCreateSessionResult GetSession( - std::shared_ptr<TTableClient::TImpl> client, - const TCreateSessionSettings& settings); + + // Extracts session from pool or creates new one ising given ctx + void GetSession(std::unique_ptr<IGetSessionCtx> ctx); + // Returns true if session returned to pool successfully - bool ReturnSession(TKqpSessionCommon* impl, bool active, std::shared_ptr<TTableClient::TImpl> client); + bool ReturnSession(TKqpSessionCommon* impl, bool active); + // Returns trun if has waiter and scheduled to create new session // too feed it - bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client, bool active); + bool CheckAndFeedWaiterNewSession(bool active); + TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); i64 GetActiveSessions() const; i64 GetActiveSessionsLimit() const; @@ -62,15 +74,9 @@ public: void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close); void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector); - static void CreateFakeSession(NThreading::TPromise<TCreateSessionResult>& promise, - std::shared_ptr<TTableClient::TImpl> client); private: void UpdateStats(); - void MakeSessionPromiseFromSession( - TKqpSessionCommon* session, - NThreading::TPromise<TCreateSessionResult>& promise, - std::shared_ptr<TTableClient::TImpl> client - ); + static void ReplySessionToUser(TKqpSessionCommon* session, std::unique_ptr<IGetSessionCtx> ctx); mutable std::mutex Mtx_; bool Closed_; diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp index 3ddb4495c4..daece9edf9 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -1,15 +1,13 @@ #include "table_client.h" - namespace NYdb { + namespace NTable { using namespace NThreading; - const TKeepAliveSettings TTableClient::TImpl::KeepAliveSettings = TKeepAliveSettings().ClientTimeout(KEEP_ALIVE_CLIENT_TIMEOUT); - bool IsSessionCloseRequested(const TStatus& status) { const auto& meta = status.GetResponseMetadata(); auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS); @@ -341,7 +339,63 @@ void TTableClient::TImpl::StartPeriodicHostScanTask() { } TAsyncCreateSessionResult TTableClient::TImpl::GetSession(const TCreateSessionSettings& settings) { - return SessionPool_.GetSession(shared_from_this(), settings); + using namespace NSessionPool; + + class TTableClientGetSessionCtx : public IGetSessionCtx { + public: + TTableClientGetSessionCtx(std::shared_ptr<TTableClient::TImpl> client, TDuration clientTimeout) + : Promise(NewPromise<TCreateSessionResult>()) + , Client(client) + , ClientTimeout(clientTimeout) + {} + + TAsyncCreateSessionResult GetFuture() { + return Promise.GetFuture(); + } + + void ReplyError(TStatus status) override { + TSession session(Client, "", "", false); + ScheduleReply(TCreateSessionResult(std::move(status), std::move(session))); + } + + void ReplySessionToUser(TKqpSessionCommon* session) override { + TCreateSessionResult val( + TStatus(TPlainStatus()), + TSession( + Client, + std::shared_ptr<TSession::TImpl>( + static_cast<TSession::TImpl*>(session), + TSession::TImpl::GetSmartDeleter(Client) + ) + ) + ); + + ScheduleReply(std::move(val)); + } + + void ReplyNewSession() override { + TCreateSessionSettings settings; + settings.ClientTimeout(ClientTimeout); + const auto& sessionResult = Client->CreateSession(settings, false); + sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(Promise, Client, settings, 0, true)); + } + + private: + void ScheduleReply(TCreateSessionResult val) { + //TODO: Do we realy need it? + Client->ScheduleTaskUnsafe([promise{std::move(Promise)}, val{std::move(val)}]() mutable { + promise.SetValue(std::move(val)); + }, TDuration()); + } + NThreading::TPromise<TCreateSessionResult> Promise; + std::shared_ptr<TTableClient::TImpl> Client; + const TDuration ClientTimeout; + }; + + auto ctx = std::make_unique<TTableClientGetSessionCtx>(shared_from_this(), settings.ClientTimeout_); + auto future = ctx->GetFuture(); + SessionPool_.GetSession(std::move(ctx)); + return future; } i64 TTableClient::TImpl::GetActiveSessionCount() const { @@ -374,9 +428,7 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio } auto session = TSession(self, result.session_id(), status.Endpoint, !standalone); if (status.Ok()) { - if (standalone) { - session.SessionImpl_->MarkStandalone(); - } else { + if (!standalone) { session.SessionImpl_->MarkActive(); } self->DbDriverState_->StatCollector.IncSessionsOnHost(status.Endpoint); @@ -860,7 +912,7 @@ bool TTableClient::TImpl::ReturnSession(TKqpSessionCommon* sessionImpl) { // Also removes NeedUpdateActiveCounter flag sessionImpl->MarkIdle(); sessionImpl->SetTimeInterval(TDuration::Zero()); - if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter, shared_from_this())) { + if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); return false; } @@ -870,7 +922,7 @@ bool TTableClient::TImpl::ReturnSession(TKqpSessionCommon* sessionImpl) { void TTableClient::TImpl::DeleteSession(TKqpSessionCommon* sessionImpl) { // Closing not owned by session pool session should not fire getting new session if (sessionImpl->IsOwnedBySessionPool()) { - if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this(), sessionImpl->NeedUpdateActiveCounter())) { + if (SessionPool_.CheckAndFeedWaiterNewSession(sessionImpl->NeedUpdateActiveCounter())) { // We requested new session for waiter which already incremented // active session counter and old session will be deleted // - skip update active counter in this case |