aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-07-24 13:17:59 +0300
committerdcherednik <dcherednik@ydb.tech>2023-07-24 13:17:59 +0300
commitd335551d092a04a48aa7ca00f95e4bc5cb066816 (patch)
tree548d3dde259d6eeb3a1eea465b8a4164ac344a38
parent8e9b187a00d40b1cd9ef46953fbff821c7f4869e (diff)
downloadydb-d335551d092a04a48aa7ca00f95e4bc5cb066816.tar.gz
Refactoring. Move out client related functions from session pool. KIKIMR-18788
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h1
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp123
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h54
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp70
6 files changed, 130 insertions, 125 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
index a4cf92fd61..6a083f4750 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
@@ -73,11 +73,6 @@ void TKqpSessionCommon::MarkAsClosing() {
}
}
-void TKqpSessionCommon::MarkStandalone() {
- State_ = EState::S_STANDALONE;
- NeedUpdateActiveCounter_ = false;
-}
-
void TKqpSessionCommon::MarkActive() {
State_ = EState::S_ACTIVE;
NeedUpdateActiveCounter_ = false;
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h
index 58d59b56ca..a932f7e54e 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h
@@ -34,7 +34,6 @@ public:
const TEndpointKey& GetEndpointKey() const;
void MarkBroken();
void MarkAsClosing();
- void MarkStandalone();
void MarkActive();
void MarkIdle();
bool IsOwnedBySessionPool() const;
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
index 21b159dc8b..a4db534a2d 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
+++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
@@ -1 +1 @@
-2.5.2 \ No newline at end of file
+2.5.3 \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
index 53e98fde73..980951bb19 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
@@ -7,6 +7,12 @@ namespace NTable {
using namespace NThreading;
+static const TStatus CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT = TStatus(
+ TPlainStatus(
+ EStatus::CLIENT_RESOURCE_EXHAUSTED,
+ "Active sessions limit exceeded"
+ )
+ );
TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout)
: MaxQueueSize_(maxQueueSize)
@@ -14,25 +20,25 @@ TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitS
{
}
-bool TSessionPool::TWaitersQueue::TryPush(NThreading::TPromise<TCreateSessionResult>& p) {
+bool TSessionPool::TWaitersQueue::TryPush(std::unique_ptr<IGetSessionCtx>& p) {
if (Waiters_.size() < MaxQueueSize_) {
- Waiters_.insert(std::make_pair(TInstant::Now(), p));
+ Waiters_.insert(std::make_pair(TInstant::Now(), std::move(p)));
return true;
}
return false;
}
-TMaybe<NThreading::TPromise<TCreateSessionResult>> TSessionPool::TWaitersQueue::TryGet() {
+std::unique_ptr<IGetSessionCtx> TSessionPool::TWaitersQueue::TryGet() {
if (Waiters_.empty()) {
return {};
}
auto it = Waiters_.begin();
- auto result = it->second;
+ auto result = std::move(it->second);
Waiters_.erase(it);
return result;
}
-void TSessionPool::TWaitersQueue::GetOld(TInstant now, TVector<NThreading::TPromise<TCreateSessionResult>>& oldWaiters) {
+void TSessionPool::TWaitersQueue::GetOld(TInstant now, TVector<std::unique_ptr<IGetSessionCtx>>& oldWaiters) {
auto it = Waiters_.begin();
while (it != Waiters_.end()) {
if (now < it->first + MaxWaitSessionTimeout_)
@@ -65,63 +71,20 @@ void TTableClient::TImpl::CloseAndDeleteSession(std::unique_ptr<TSession::TImpl>
deleteSession->MarkBroken();
}
-void TSessionPool::CreateFakeSession(
- NThreading::TPromise<TCreateSessionResult>& promise,
- std::shared_ptr<TTableClient::TImpl> client)
-{
- TSession session(client, "", "", false);
- // Mark standalone to prevent returning to session pool
- session.SessionImpl_->MarkStandalone();
- TCreateSessionResult val(
- TStatus(
- TPlainStatus(
- EStatus::CLIENT_RESOURCE_EXHAUSTED,
- "Active sessions limit exceeded"
- )
- ),
- std::move(session)
- );
-
- client->ScheduleTaskUnsafe([promise, val{std::move(val)}]() mutable {
- promise.SetValue(std::move(val));
- }, TDuration());
-}
-
-void TSessionPool::MakeSessionPromiseFromSession(
+void TSessionPool::ReplySessionToUser(
TKqpSessionCommon* session,
- NThreading::TPromise<TCreateSessionResult>& promise,
- std::shared_ptr<TTableClient::TImpl> client
-) {
+ std::unique_ptr<IGetSessionCtx> ctx)
+{
Y_VERIFY(session->GetState() == TSession::TImpl::S_IDLE);
Y_VERIFY(!session->GetTimeInterval());
session->MarkActive();
session->SetNeedUpdateActiveCounter(true);
-
- TCreateSessionResult val(
- TStatus(TPlainStatus()),
- TSession(
- client,
- std::shared_ptr<TSession::TImpl>(
- static_cast<TSession::TImpl*>(session),
- TSession::TImpl::GetSmartDeleter(client)
- )
- )
- );
-
- client->ScheduleTaskUnsafe(
- [promise, val{std::move(val)}]() mutable {
- promise.SetValue(std::move(val));
- },
- TDuration()
- );
+ ctx->ReplySessionToUser(session);
}
-TAsyncCreateSessionResult TSessionPool::GetSession(
- std::shared_ptr<TTableClient::TImpl> client,
- const TCreateSessionSettings& settings)
+void TSessionPool::GetSession(std::unique_ptr<IGetSessionCtx> ctx)
{
- auto createSessionPromise = NewPromise<TCreateSessionResult>();
- std::unique_ptr<TSession::TImpl> sessionImpl;
+ std::unique_ptr<TKqpSessionCommon> sessionImpl;
enum class TSessionSource {
Pool,
Waiter,
@@ -133,7 +96,7 @@ TAsyncCreateSessionResult TSessionPool::GetSession(
if (MaxActiveSessions_ == 0 || ActiveSessions_ < MaxActiveSessions_) {
IncrementActiveCounterUnsafe();
- } else if (WaitersQueue_.TryPush(createSessionPromise)) {
+ } else if (WaitersQueue_.TryPush(ctx)) {
sessionSource = TSessionSource::Waiter;
} else {
sessionSource = TSessionSource::Error;
@@ -143,33 +106,31 @@ TAsyncCreateSessionResult TSessionPool::GetSession(
sessionImpl = std::move(it->second);
Sessions_.erase(it);
}
-
+
UpdateStats();
}
+
if (sessionSource == TSessionSource::Waiter) {
// Nothing to do here
} else if (sessionSource == TSessionSource::Error) {
FakeSessionsCounter_.Inc();
- CreateFakeSession(createSessionPromise, client);
+ ctx->ReplyError(CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT);
} else if (sessionImpl) {
- MakeSessionPromiseFromSession(sessionImpl.release(), createSessionPromise, client);
+ ReplySessionToUser(sessionImpl.release(), std::move(ctx));
} else {
- const auto& sessionResult = client->CreateSession(settings, false);
- sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true));
+ ctx->ReplyNewSession();
}
-
- return createSessionPromise.GetFuture();
}
-bool TSessionPool::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client, bool active) {
- NThreading::TPromise<TCreateSessionResult> createSessionPromise;
+bool TSessionPool::CheckAndFeedWaiterNewSession(bool active) {
+ std::unique_ptr<IGetSessionCtx> getSessionCtx;
{
std::lock_guard guard(Mtx_);
if (Closed_)
return false;
- if (auto prom = WaitersQueue_.TryGet()) {
- createSessionPromise = std::move(*prom);
+ if (auto maybeCtx = WaitersQueue_.TryGet()) {
+ getSessionCtx = std::move(maybeCtx);
} else {
return false;
}
@@ -186,32 +147,24 @@ bool TSessionPool::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TI
// The above mentioned conditions is quite rare so
// we can just return an error. In this case uplevel code should
// start retry.
- CreateFakeSession(createSessionPromise, client);
+ getSessionCtx->ReplyError(CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT);
return true;
}
- // This code can be called from client dtors. It may be unsafe to
- // execute grpc call directly...
- client->ScheduleTaskUnsafe([createSessionPromise, client]() mutable {
- TCreateSessionSettings settings;
- settings.ClientTimeout(CREATE_SESSION_INTERNAL_TIMEOUT);
-
- const auto& sessionResult = client->CreateSession(settings, false);
- sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true));
- }, TDuration());
+ getSessionCtx->ReplyNewSession();
return true;
}
-bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) {
- // Do not set promise under the session pool lock
- NThreading::TPromise<TCreateSessionResult> createSessionPromise;
+bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active) {
+ // Do not call ReplySessionToUser under the session pool lock
+ std::unique_ptr<IGetSessionCtx> getSessionCtx;
{
std::lock_guard guard(Mtx_);
if (Closed_)
return false;
- if (auto prom = WaitersQueue_.TryGet()) {
- createSessionPromise = std::move(*prom);
+ if (auto maybeCtx = WaitersQueue_.TryGet()) {
+ getSessionCtx = std::move(maybeCtx);
if (!active)
IncrementActiveCounterUnsafe();
} else {
@@ -228,8 +181,8 @@ bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active, std::shar
UpdateStats();
}
- if (createSessionPromise.Initialized()) {
- MakeSessionPromiseFromSession(impl, createSessionPromise, client);
+ if (getSessionCtx) {
+ ReplySessionToUser(impl, std::move(getSessionCtx));
}
return true;
@@ -278,7 +231,7 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl>
sessionsToTouch.reserve(keepAliveBatchSize);
TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete;
sessionsToDelete.reserve(keepAliveBatchSize);
- TVector<NThreading::TPromise<TCreateSessionResult>> waitersToReplyError;
+ TVector<std::unique_ptr<IGetSessionCtx>> waitersToReplyError;
waitersToReplyError.reserve(keepAliveBatchSize);
const auto now = TInstant::Now();
{
@@ -324,7 +277,7 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl>
for (auto& waiter : waitersToReplyError) {
FakeSessionsCounter_.Inc();
- CreateFakeSession(waiter, strongClient);
+ waiter->ReplyError(CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT);
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
index 702fe185f7..a37df781b6 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
@@ -8,50 +8,62 @@
namespace NYdb {
+
+namespace NSessionPool {
+
+class IGetSessionCtx : private TNonCopyable {
+public:
+ virtual ~IGetSessionCtx() = default;
+ // Transfer ownership to ctx
+ virtual void ReplySessionToUser(TKqpSessionCommon* session) = 0;
+ virtual void ReplyError(TStatus status) = 0;
+ virtual void ReplyNewSession() = 0;
+};
+
+}
+
namespace NTable {
using namespace NThreading;
-
+using namespace NSessionPool;
constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session
constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval
constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool
-
class TSessionPool {
- typedef TAsyncCreateSessionResult
- (*TAwareSessonProvider)
- (std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings);
private:
class TWaitersQueue {
public:
TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout=MAX_WAIT_SESSION_TIMEOUT);
- bool TryPush(NThreading::TPromise<TCreateSessionResult>& p);
- TMaybe<NThreading::TPromise<TCreateSessionResult>> TryGet();
- void GetOld(TInstant now, TVector<NThreading::TPromise<TCreateSessionResult>>& oldWaiters);
+ // returns true and gets ownership if queue size less than limit
+ // otherwise returns false and doesn't not touch ctx
+ bool TryPush(std::unique_ptr<IGetSessionCtx>& p);
+ std::unique_ptr<IGetSessionCtx> TryGet();
+ void GetOld(TInstant now, TVector<std::unique_ptr<IGetSessionCtx>>& oldWaiters);
ui32 Size() const;
private:
const ui32 MaxQueueSize_;
const TDuration MaxWaitSessionTimeout_;
- TMultiMap<TInstant, NThreading::TPromise<TCreateSessionResult>> Waiters_;
+ TMultiMap<TInstant, std::unique_ptr<IGetSessionCtx>> Waiters_;
};
public:
using TKeepAliveCmd = std::function<void(TSession session)>;
using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>;
TSessionPool(ui32 maxActiveSessions);
- // TAwareSessonProvider:
- // function is called if session pool is empty,
- // this is used for additional total session count limitation
- TAsyncCreateSessionResult GetSession(
- std::shared_ptr<TTableClient::TImpl> client,
- const TCreateSessionSettings& settings);
+
+ // Extracts session from pool or creates new one ising given ctx
+ void GetSession(std::unique_ptr<IGetSessionCtx> ctx);
+
// Returns true if session returned to pool successfully
- bool ReturnSession(TKqpSessionCommon* impl, bool active, std::shared_ptr<TTableClient::TImpl> client);
+ bool ReturnSession(TKqpSessionCommon* impl, bool active);
+
// Returns trun if has waiter and scheduled to create new session
// too feed it
- bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client, bool active);
+ bool CheckAndFeedWaiterNewSession(bool active);
+
TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate);
i64 GetActiveSessions() const;
i64 GetActiveSessionsLimit() const;
@@ -62,15 +74,9 @@ public:
void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close);
void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector);
- static void CreateFakeSession(NThreading::TPromise<TCreateSessionResult>& promise,
- std::shared_ptr<TTableClient::TImpl> client);
private:
void UpdateStats();
- void MakeSessionPromiseFromSession(
- TKqpSessionCommon* session,
- NThreading::TPromise<TCreateSessionResult>& promise,
- std::shared_ptr<TTableClient::TImpl> client
- );
+ static void ReplySessionToUser(TKqpSessionCommon* session, std::unique_ptr<IGetSessionCtx> ctx);
mutable std::mutex Mtx_;
bool Closed_;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
index 3ddb4495c4..daece9edf9 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
@@ -1,15 +1,13 @@
#include "table_client.h"
-
namespace NYdb {
+
namespace NTable {
using namespace NThreading;
-
const TKeepAliveSettings TTableClient::TImpl::KeepAliveSettings = TKeepAliveSettings().ClientTimeout(KEEP_ALIVE_CLIENT_TIMEOUT);
-
bool IsSessionCloseRequested(const TStatus& status) {
const auto& meta = status.GetResponseMetadata();
auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS);
@@ -341,7 +339,63 @@ void TTableClient::TImpl::StartPeriodicHostScanTask() {
}
TAsyncCreateSessionResult TTableClient::TImpl::GetSession(const TCreateSessionSettings& settings) {
- return SessionPool_.GetSession(shared_from_this(), settings);
+ using namespace NSessionPool;
+
+ class TTableClientGetSessionCtx : public IGetSessionCtx {
+ public:
+ TTableClientGetSessionCtx(std::shared_ptr<TTableClient::TImpl> client, TDuration clientTimeout)
+ : Promise(NewPromise<TCreateSessionResult>())
+ , Client(client)
+ , ClientTimeout(clientTimeout)
+ {}
+
+ TAsyncCreateSessionResult GetFuture() {
+ return Promise.GetFuture();
+ }
+
+ void ReplyError(TStatus status) override {
+ TSession session(Client, "", "", false);
+ ScheduleReply(TCreateSessionResult(std::move(status), std::move(session)));
+ }
+
+ void ReplySessionToUser(TKqpSessionCommon* session) override {
+ TCreateSessionResult val(
+ TStatus(TPlainStatus()),
+ TSession(
+ Client,
+ std::shared_ptr<TSession::TImpl>(
+ static_cast<TSession::TImpl*>(session),
+ TSession::TImpl::GetSmartDeleter(Client)
+ )
+ )
+ );
+
+ ScheduleReply(std::move(val));
+ }
+
+ void ReplyNewSession() override {
+ TCreateSessionSettings settings;
+ settings.ClientTimeout(ClientTimeout);
+ const auto& sessionResult = Client->CreateSession(settings, false);
+ sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(Promise, Client, settings, 0, true));
+ }
+
+ private:
+ void ScheduleReply(TCreateSessionResult val) {
+ //TODO: Do we realy need it?
+ Client->ScheduleTaskUnsafe([promise{std::move(Promise)}, val{std::move(val)}]() mutable {
+ promise.SetValue(std::move(val));
+ }, TDuration());
+ }
+ NThreading::TPromise<TCreateSessionResult> Promise;
+ std::shared_ptr<TTableClient::TImpl> Client;
+ const TDuration ClientTimeout;
+ };
+
+ auto ctx = std::make_unique<TTableClientGetSessionCtx>(shared_from_this(), settings.ClientTimeout_);
+ auto future = ctx->GetFuture();
+ SessionPool_.GetSession(std::move(ctx));
+ return future;
}
i64 TTableClient::TImpl::GetActiveSessionCount() const {
@@ -374,9 +428,7 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio
}
auto session = TSession(self, result.session_id(), status.Endpoint, !standalone);
if (status.Ok()) {
- if (standalone) {
- session.SessionImpl_->MarkStandalone();
- } else {
+ if (!standalone) {
session.SessionImpl_->MarkActive();
}
self->DbDriverState_->StatCollector.IncSessionsOnHost(status.Endpoint);
@@ -860,7 +912,7 @@ bool TTableClient::TImpl::ReturnSession(TKqpSessionCommon* sessionImpl) {
// Also removes NeedUpdateActiveCounter flag
sessionImpl->MarkIdle();
sessionImpl->SetTimeInterval(TDuration::Zero());
- if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter, shared_from_this())) {
+ if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) {
sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter);
return false;
}
@@ -870,7 +922,7 @@ bool TTableClient::TImpl::ReturnSession(TKqpSessionCommon* sessionImpl) {
void TTableClient::TImpl::DeleteSession(TKqpSessionCommon* sessionImpl) {
// Closing not owned by session pool session should not fire getting new session
if (sessionImpl->IsOwnedBySessionPool()) {
- if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this(), sessionImpl->NeedUpdateActiveCounter())) {
+ if (SessionPool_.CheckAndFeedWaiterNewSession(sessionImpl->NeedUpdateActiveCounter())) {
// We requested new session for waiter which already incremented
// active session counter and old session will be deleted
// - skip update active counter in this case