diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-07-25 20:35:21 +0300 |
---|---|---|
committer | root <root@qavm-2ed34686.qemu> | 2023-07-25 20:35:21 +0300 |
commit | 4201699ddd835c304f1196cc470aaef6c899328a (patch) | |
tree | 1ba99a4c57a39c1c046f84ff011868837c83c19a | |
parent | 54799eaba9e7338773883367169f28cd1e9b7799 (diff) | |
download | ydb-4201699ddd835c304f1196cc470aaef6c899328a.tar.gz |
Refactoring. Use common KqpSession inside session pool. KIKIMR-18788
5 files changed, 41 insertions, 41 deletions
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt index a4db534a2d..d21aa93ccd 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt +++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -2.5.3
\ No newline at end of file +2.5.4
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp index 980951bb19..da9df168c2 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp @@ -1,6 +1,8 @@ #include "session_pool.h" -#include "table_client.h" +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h> +#undef INCLUDE_YDB_INTERNAL_H namespace NYdb { namespace NTable { @@ -62,20 +64,19 @@ TSessionPool::TSessionPool(ui32 maxActiveSessions) , MaxActiveSessions_(maxActiveSessions) {} -void TTableClient::TImpl::CloseAndDeleteSession(std::unique_ptr<TSession::TImpl>&& impl, - std::shared_ptr<TTableClient::TImpl> client) { - std::shared_ptr<TSession::TImpl> deleteSession( - impl.release(), - TSession::TImpl::GetSmartDeleter(client)); - - deleteSession->MarkBroken(); +static void CloseAndDeleteSession(std::unique_ptr<TKqpSessionCommon>&& impl, + std::shared_ptr<ISessionClient> client) { + auto deleter = TKqpSessionCommon::GetSmartDeleter(client); + TKqpSessionCommon* p = impl.release(); + p->MarkBroken(); + deleter(p); } void TSessionPool::ReplySessionToUser( TKqpSessionCommon* session, std::unique_ptr<IGetSessionCtx> ctx) { - Y_VERIFY(session->GetState() == TSession::TImpl::S_IDLE); + Y_VERIFY(session->GetState() == TKqpSessionCommon::S_IDLE); Y_VERIFY(!session->GetTimeInterval()); session->MarkActive(); session->SetNeedUpdateActiveCounter(true); @@ -170,7 +171,7 @@ bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active) { } else { Sessions_.emplace(std::make_pair( impl->GetTimeToTouchFast(), - static_cast<TSession::TImpl*>(impl))); + impl)); if (active) { Y_VERIFY(ActiveSessions_); @@ -200,7 +201,7 @@ void TSessionPool::IncrementActiveCounterUnsafe() { UpdateStats(); } -void TSessionPool::Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close) { +void TSessionPool::Drain(std::function<bool(std::unique_ptr<TKqpSessionCommon>&&)> cb, bool close) { std::lock_guard guard(Mtx_); Closed_ = close; for (auto it = Sessions_.begin(); it != Sessions_.end();) { @@ -212,7 +213,7 @@ void TSessionPool::Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> UpdateStats(); } -TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, +TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<ISessionClient> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate) { auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) { @@ -227,9 +228,9 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> return false; } else { auto keepAliveBatchSize = PERIODIC_ACTION_BATCH_SIZE; - TVector<std::unique_ptr<TSession::TImpl>> sessionsToTouch; + TVector<std::unique_ptr<TKqpSessionCommon>> sessionsToTouch; sessionsToTouch.reserve(keepAliveBatchSize); - TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete; + TVector<std::unique_ptr<TKqpSessionCommon>> sessionsToDelete; sessionsToDelete.reserve(keepAliveBatchSize); TVector<std::unique_ptr<IGetSessionCtx>> waitersToReplyError; waitersToReplyError.reserve(keepAliveBatchSize); @@ -244,7 +245,7 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> if (now < it->second->GetTimeToTouchFast()) break; - if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) { + if (deletePredicate(it->second.get(), sessions.size())) { sessionsToDelete.emplace_back(std::move(it->second)); } else { sessionsToTouch.emplace_back(std::move(it->second)); @@ -260,18 +261,15 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> for (auto& sessionImpl : sessionsToTouch) { if (sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); - TSession session(strongClient, std::shared_ptr<TSession::TImpl>( - sessionImpl.release(), - TSession::TImpl::GetSmartDeleter(strongClient))); - cmd(session); + Y_VERIFY(sessionImpl->GetState() == TKqpSessionCommon::S_IDLE); + cmd(sessionImpl.release()); } } for (auto& sessionImpl : sessionsToDelete) { if (sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); - TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient); + Y_VERIFY(sessionImpl->GetState() == TKqpSessionCommon::S_IDLE); + CloseAndDeleteSession(std::move(sessionImpl), strongClient); } } diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h index a37df781b6..61da9aa2ee 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h @@ -1,12 +1,10 @@ #pragma once -#include "client_session.h" - +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h> #include <ydb/public/sdk/cpp/client/ydb_types/core_facility/core_facility.h> #include <util/generic/map.h> - namespace NYdb { namespace NSessionPool { @@ -50,8 +48,8 @@ private: TMultiMap<TInstant, std::unique_ptr<IGetSessionCtx>> Waiters_; }; public: - using TKeepAliveCmd = std::function<void(TSession session)>; - using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>; + using TKeepAliveCmd = std::function<void(TKqpSessionCommon* s)>; + using TDeletePredicate = std::function<bool(TKqpSessionCommon* s, size_t sessionsCount)>; TSessionPool(ui32 maxActiveSessions); // Extracts session from pool or creates new one ising given ctx @@ -64,14 +62,14 @@ public: // too feed it bool CheckAndFeedWaiterNewSession(bool active); - TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); + TPeriodicCb CreatePeriodicTask(std::weak_ptr<ISessionClient> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); i64 GetActiveSessions() const; i64 GetActiveSessionsLimit() const; i64 GetCurrentPoolSize() const; void DecrementActiveCounter(); void IncrementActiveCounterUnsafe(); - void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close); + void Drain(std::function<bool(std::unique_ptr<TKqpSessionCommon>&&)> cb, bool close); void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector); private: @@ -81,7 +79,7 @@ private: mutable std::mutex Mtx_; bool Closed_; - TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_; + TMultiMap<TInstant, std::unique_ptr<TKqpSessionCommon>> Sessions_; TWaitersQueue WaitersQueue_; i64 ActiveSessions_; diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp index daece9edf9..d6a122bfec 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -100,10 +100,10 @@ void TTableClient::TImpl::InitStopper() { } NThreading::TFuture<void> TTableClient::TImpl::Drain() { - TVector<std::unique_ptr<TSession::TImpl>> sessions; + TVector<std::unique_ptr<TKqpSessionCommon>> sessions; // No realocations under lock sessions.reserve(Settings_.SessionPoolSettings_.MaxActiveSessions_); - auto drainer = [&sessions](std::unique_ptr<TSession::TImpl>&& impl) mutable { + auto drainer = [&sessions](std::unique_ptr<TKqpSessionCommon>&& impl) mutable { sessions.push_back(std::move(impl)); return true; }; @@ -144,10 +144,11 @@ void TTableClient::TImpl::AsyncBackoff(const TBackoffSettings& settings, ui32 re void TTableClient::TImpl::StartPeriodicSessionPoolTask() { - auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) { + // Session pool guarantees than client is alive during call callbacks + auto deletePredicate = [this](TKqpSessionCommon* s, size_t sessionsCount) { - const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_; - const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast(); + const auto& sessionPoolSettings = Settings_.SessionPoolSettings_; + const auto spentTime = s->GetTimeToTouchFast() - s->GetTimeInPastFast(); if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) { if (sessionsCount > sessionPoolSettings.MinPoolSize_) { @@ -158,7 +159,14 @@ void TTableClient::TImpl::StartPeriodicSessionPoolTask() { return false; }; - auto keepAliveCmd = [](TSession session) { + auto keepAliveCmd = [this](TKqpSessionCommon* s) { + auto strongClient = shared_from_this(); + TSession session( + strongClient, + std::shared_ptr<TSession::TImpl>( + static_cast<TSession::TImpl*>(s), + TSession::TImpl::GetSmartDeleter(strongClient) + )); Y_VERIFY(session.GetId()); diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h index d9c9161f19..6d4c3aec1e 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h @@ -201,10 +201,6 @@ public: bool ReturnSession(TKqpSessionCommon* sessionImpl) override; void DeleteSession(TKqpSessionCommon* sessionImpl) override; ui32 GetSessionRetryLimit() const; - static void CloseAndDeleteSession( - std::unique_ptr<TSession::TImpl>&& impl, - std::shared_ptr<TTableClient::TImpl> client); - void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector); |