aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-07-25 20:35:21 +0300
committerroot <root@qavm-2ed34686.qemu>2023-07-25 20:35:21 +0300
commit4201699ddd835c304f1196cc470aaef6c899328a (patch)
tree1ba99a4c57a39c1c046f84ff011868837c83c19a
parent54799eaba9e7338773883367169f28cd1e9b7799 (diff)
downloadydb-4201699ddd835c304f1196cc470aaef6c899328a.tar.gz
Refactoring. Use common KqpSession inside session pool. KIKIMR-18788
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp42
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h14
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp20
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h4
5 files changed, 41 insertions, 41 deletions
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
index a4db534a2d..d21aa93ccd 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
+++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
@@ -1 +1 @@
-2.5.3 \ No newline at end of file
+2.5.4 \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
index 980951bb19..da9df168c2 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
@@ -1,6 +1,8 @@
#include "session_pool.h"
-#include "table_client.h"
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h>
+#undef INCLUDE_YDB_INTERNAL_H
namespace NYdb {
namespace NTable {
@@ -62,20 +64,19 @@ TSessionPool::TSessionPool(ui32 maxActiveSessions)
, MaxActiveSessions_(maxActiveSessions)
{}
-void TTableClient::TImpl::CloseAndDeleteSession(std::unique_ptr<TSession::TImpl>&& impl,
- std::shared_ptr<TTableClient::TImpl> client) {
- std::shared_ptr<TSession::TImpl> deleteSession(
- impl.release(),
- TSession::TImpl::GetSmartDeleter(client));
-
- deleteSession->MarkBroken();
+static void CloseAndDeleteSession(std::unique_ptr<TKqpSessionCommon>&& impl,
+ std::shared_ptr<ISessionClient> client) {
+ auto deleter = TKqpSessionCommon::GetSmartDeleter(client);
+ TKqpSessionCommon* p = impl.release();
+ p->MarkBroken();
+ deleter(p);
}
void TSessionPool::ReplySessionToUser(
TKqpSessionCommon* session,
std::unique_ptr<IGetSessionCtx> ctx)
{
- Y_VERIFY(session->GetState() == TSession::TImpl::S_IDLE);
+ Y_VERIFY(session->GetState() == TKqpSessionCommon::S_IDLE);
Y_VERIFY(!session->GetTimeInterval());
session->MarkActive();
session->SetNeedUpdateActiveCounter(true);
@@ -170,7 +171,7 @@ bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active) {
} else {
Sessions_.emplace(std::make_pair(
impl->GetTimeToTouchFast(),
- static_cast<TSession::TImpl*>(impl)));
+ impl));
if (active) {
Y_VERIFY(ActiveSessions_);
@@ -200,7 +201,7 @@ void TSessionPool::IncrementActiveCounterUnsafe() {
UpdateStats();
}
-void TSessionPool::Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close) {
+void TSessionPool::Drain(std::function<bool(std::unique_ptr<TKqpSessionCommon>&&)> cb, bool close) {
std::lock_guard guard(Mtx_);
Closed_ = close;
for (auto it = Sessions_.begin(); it != Sessions_.end();) {
@@ -212,7 +213,7 @@ void TSessionPool::Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)>
UpdateStats();
}
-TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient,
+TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<ISessionClient> weakClient,
TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate)
{
auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) {
@@ -227,9 +228,9 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl>
return false;
} else {
auto keepAliveBatchSize = PERIODIC_ACTION_BATCH_SIZE;
- TVector<std::unique_ptr<TSession::TImpl>> sessionsToTouch;
+ TVector<std::unique_ptr<TKqpSessionCommon>> sessionsToTouch;
sessionsToTouch.reserve(keepAliveBatchSize);
- TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete;
+ TVector<std::unique_ptr<TKqpSessionCommon>> sessionsToDelete;
sessionsToDelete.reserve(keepAliveBatchSize);
TVector<std::unique_ptr<IGetSessionCtx>> waitersToReplyError;
waitersToReplyError.reserve(keepAliveBatchSize);
@@ -244,7 +245,7 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl>
if (now < it->second->GetTimeToTouchFast())
break;
- if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) {
+ if (deletePredicate(it->second.get(), sessions.size())) {
sessionsToDelete.emplace_back(std::move(it->second));
} else {
sessionsToTouch.emplace_back(std::move(it->second));
@@ -260,18 +261,15 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl>
for (auto& sessionImpl : sessionsToTouch) {
if (sessionImpl) {
- Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE);
- TSession session(strongClient, std::shared_ptr<TSession::TImpl>(
- sessionImpl.release(),
- TSession::TImpl::GetSmartDeleter(strongClient)));
- cmd(session);
+ Y_VERIFY(sessionImpl->GetState() == TKqpSessionCommon::S_IDLE);
+ cmd(sessionImpl.release());
}
}
for (auto& sessionImpl : sessionsToDelete) {
if (sessionImpl) {
- Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE);
- TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient);
+ Y_VERIFY(sessionImpl->GetState() == TKqpSessionCommon::S_IDLE);
+ CloseAndDeleteSession(std::move(sessionImpl), strongClient);
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
index a37df781b6..61da9aa2ee 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
@@ -1,12 +1,10 @@
#pragma once
-#include "client_session.h"
-
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h>
#include <ydb/public/sdk/cpp/client/ydb_types/core_facility/core_facility.h>
#include <util/generic/map.h>
-
namespace NYdb {
namespace NSessionPool {
@@ -50,8 +48,8 @@ private:
TMultiMap<TInstant, std::unique_ptr<IGetSessionCtx>> Waiters_;
};
public:
- using TKeepAliveCmd = std::function<void(TSession session)>;
- using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>;
+ using TKeepAliveCmd = std::function<void(TKqpSessionCommon* s)>;
+ using TDeletePredicate = std::function<bool(TKqpSessionCommon* s, size_t sessionsCount)>;
TSessionPool(ui32 maxActiveSessions);
// Extracts session from pool or creates new one ising given ctx
@@ -64,14 +62,14 @@ public:
// too feed it
bool CheckAndFeedWaiterNewSession(bool active);
- TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate);
+ TPeriodicCb CreatePeriodicTask(std::weak_ptr<ISessionClient> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate);
i64 GetActiveSessions() const;
i64 GetActiveSessionsLimit() const;
i64 GetCurrentPoolSize() const;
void DecrementActiveCounter();
void IncrementActiveCounterUnsafe();
- void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close);
+ void Drain(std::function<bool(std::unique_ptr<TKqpSessionCommon>&&)> cb, bool close);
void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector);
private:
@@ -81,7 +79,7 @@ private:
mutable std::mutex Mtx_;
bool Closed_;
- TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_;
+ TMultiMap<TInstant, std::unique_ptr<TKqpSessionCommon>> Sessions_;
TWaitersQueue WaitersQueue_;
i64 ActiveSessions_;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
index daece9edf9..d6a122bfec 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
@@ -100,10 +100,10 @@ void TTableClient::TImpl::InitStopper() {
}
NThreading::TFuture<void> TTableClient::TImpl::Drain() {
- TVector<std::unique_ptr<TSession::TImpl>> sessions;
+ TVector<std::unique_ptr<TKqpSessionCommon>> sessions;
// No realocations under lock
sessions.reserve(Settings_.SessionPoolSettings_.MaxActiveSessions_);
- auto drainer = [&sessions](std::unique_ptr<TSession::TImpl>&& impl) mutable {
+ auto drainer = [&sessions](std::unique_ptr<TKqpSessionCommon>&& impl) mutable {
sessions.push_back(std::move(impl));
return true;
};
@@ -144,10 +144,11 @@ void TTableClient::TImpl::AsyncBackoff(const TBackoffSettings& settings, ui32 re
void TTableClient::TImpl::StartPeriodicSessionPoolTask() {
- auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) {
+ // Session pool guarantees than client is alive during call callbacks
+ auto deletePredicate = [this](TKqpSessionCommon* s, size_t sessionsCount) {
- const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_;
- const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast();
+ const auto& sessionPoolSettings = Settings_.SessionPoolSettings_;
+ const auto spentTime = s->GetTimeToTouchFast() - s->GetTimeInPastFast();
if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) {
if (sessionsCount > sessionPoolSettings.MinPoolSize_) {
@@ -158,7 +159,14 @@ void TTableClient::TImpl::StartPeriodicSessionPoolTask() {
return false;
};
- auto keepAliveCmd = [](TSession session) {
+ auto keepAliveCmd = [this](TKqpSessionCommon* s) {
+ auto strongClient = shared_from_this();
+ TSession session(
+ strongClient,
+ std::shared_ptr<TSession::TImpl>(
+ static_cast<TSession::TImpl*>(s),
+ TSession::TImpl::GetSmartDeleter(strongClient)
+ ));
Y_VERIFY(session.GetId());
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
index d9c9161f19..6d4c3aec1e 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
@@ -201,10 +201,6 @@ public:
bool ReturnSession(TKqpSessionCommon* sessionImpl) override;
void DeleteSession(TKqpSessionCommon* sessionImpl) override;
ui32 GetSessionRetryLimit() const;
- static void CloseAndDeleteSession(
- std::unique_ptr<TSession::TImpl>&& impl,
- std::shared_ptr<TTableClient::TImpl> client);
-
void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector);