aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-05-17 20:56:19 +0300
committerdcherednik <dcherednik@ydb.tech>2023-05-17 20:56:19 +0300
commit06d41c8ae0d06c6b8532cff54b513a4f10e3db0c (patch)
tree837baa73f3cd3bccd623ec1832800a717a9a1cbb
parent1ae414d53159cbb50c7bc23d2a6f873a9dae99fa (diff)
downloadydb-06d41c8ae0d06c6b8532cff54b513a4f10e3db0c.tar.gz
Subscribe for sessions in case of active session limit issue.
old behaviour - return error immediately if we reach active session limit. new behaviour - subscribe for returning sessions. The subscribers queue has limit for 10xActiveSessionLimit. In case of overflow this queue CLIENT_RESOURCE_EXOSTED error returned (just like old behaviour).
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h13
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp170
5 files changed, 160 insertions, 33 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
index e843cda0e0b..2665b62aecc 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
@@ -183,13 +183,18 @@ public:
struct TSessionPoolStatCollector {
TSessionPoolStatCollector(::NMonitoring::TIntGauge* activeSessions = nullptr
, ::NMonitoring::TIntGauge* inPoolSessions = nullptr
- , ::NMonitoring::TRate* fakeSessions = nullptr)
- : ActiveSessions(activeSessions), InPoolSessions(inPoolSessions), FakeSessions(fakeSessions)
+ , ::NMonitoring::TRate* fakeSessions = nullptr
+ , ::NMonitoring::TIntGauge* waiters = nullptr)
+ : ActiveSessions(activeSessions)
+ , InPoolSessions(inPoolSessions)
+ , FakeSessions(fakeSessions)
+ , Waiters(waiters)
{ }
::NMonitoring::TIntGauge* ActiveSessions;
::NMonitoring::TIntGauge* InPoolSessions;
::NMonitoring::TRate* FakeSessions;
+ ::NMonitoring::TIntGauge* Waiters;
};
struct TClientRetryOperationStatCollector {
@@ -266,6 +271,7 @@ public:
CacheMiss_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/ClientQueryCacheMiss"} }));
ActiveSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InUse"} }));
InPoolSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InPool"} }));
+ Waiters_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/WaitForReturn"} }));
SessionCV_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "SessionBalancer/Variation"} }));
SessionRemovedDueBalancing_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/SessionsRemoved"} }));
RequestMigrated_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/RequestsMigrated"} }));
@@ -348,7 +354,7 @@ public:
return TSessionPoolStatCollector();
}
- return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get());
+ return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get(), Waiters_.Get());
}
TClientStatCollector GetClientStatCollector() {
@@ -386,6 +392,7 @@ private:
TAtomicCounter<::NMonitoring::TRate> DiscoveryFailDueTransportError_;
TAtomicPointer<::NMonitoring::TIntGauge> ActiveSessions_;
TAtomicPointer<::NMonitoring::TIntGauge> InPoolSessions_;
+ TAtomicPointer<::NMonitoring::TIntGauge> Waiters_;
TAtomicCounter<::NMonitoring::TIntGauge> SessionCV_;
TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing_;
TAtomicCounter<::NMonitoring::TRate> RequestMigrated_;
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
index 9183195ace7..fad066f801a 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
+++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
@@ -1 +1 @@
-2.4.0 \ No newline at end of file
+2.5.0 \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
index ff1729043a0..35f91b69b0d 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
@@ -40,7 +40,7 @@ void TRequestMigrator::SetHost(ui64 nodeId) {
CurHost_ = nodeId;
}
-bool TRequestMigrator::IsOurSession(TSession::TImpl* session) const {
+bool TRequestMigrator::IsOurSession(const TSession::TImpl* session) const {
if (!CurHost_)
return false;
@@ -65,7 +65,7 @@ bool TRequestMigrator::Reset() {
}
}
-bool TRequestMigrator::DoCheckAndMigrate(TSession::TImpl* session) {
+bool TRequestMigrator::DoCheckAndMigrate(const TSession::TImpl* session) {
if (session->GetEndpoint().empty())
return false;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
index 2eb7e1b609f..fbe6875b3aa 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
@@ -51,13 +51,13 @@ public:
// Returns false if session is not suitable or unable to get lock to start migration
// Returns true if session is suitable in this case Unlink methos on the session is called
// This methos is thread safe.
- bool DoCheckAndMigrate(TSession::TImpl* session);
+ bool DoCheckAndMigrate(const TSession::TImpl* session);
// Reset migrator to initiall state if migration was not started and returns true
// Returns false if migration was started
bool Reset();
private:
- bool IsOurSession(TSession::TImpl* session) const;
+ bool IsOurSession(const TSession::TImpl* session) const;
ui64 CurHost_ = 0;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 34e708203fc..ed14768f040 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -37,6 +37,8 @@ constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4;
constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5);
constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval
constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds();
+constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool
+constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session
////////////////////////////////////////////////////////////////////////////////
@@ -1382,7 +1384,10 @@ public:
// NOTE: O(n) under session pool lock, should not be used often
bool DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, ui64 nodeId);
// Returns true if session returned to pool successfully
- bool ReturnSession(TSession::TImpl* impl, bool active);
+ bool ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client);
+ // Returns trun if has waiter and scheduled to create new session
+ // too feed it
+ bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client);
TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate);
i64 GetActiveSessions() const;
i64 GetActiveSessionsLimit() const;
@@ -1399,11 +1404,14 @@ private:
mutable std::mutex Mtx_;
TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_;
+ TMultiMap<TInstant, NThreading::TPromise<TCreateSessionResult>> Waiters_;
+
bool Closed_;
i64 ActiveSessions_;
const ui32 MaxActiveSessions_;
NSdkStats::TSessionCounter ActiveSessionsCounter_;
NSdkStats::TSessionCounter InPoolSessionsCounter_;
+ NSdkStats::TSessionCounter SessionWaiterCounter_;
NSdkStats::TAtomicCounter<::NMonitoring::TRate> FakeSessionsCounter_;
};
@@ -1612,6 +1620,7 @@ public:
};
auto keepAliveCmd = [](TSession session) {
+
Y_VERIFY(session.GetId());
const auto sessionPoolSettings = session.Client_->Settings_.SessionPoolSettings_;
@@ -2315,7 +2324,7 @@ public:
// Also removes NeedUpdateActiveCounter flag
sessionImpl->MarkIdle();
sessionImpl->SetTimeInterval(TDuration::Zero());
- if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) {
+ if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter, shared_from_this())) {
sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter);
return false;
}
@@ -2323,6 +2332,16 @@ public:
}
void DeleteSession(TSession::TImpl* sessionImpl) {
+ // Closing S_STANDALONE session should not fire getting new session
+ if (sessionImpl->GetState() != TSession::TImpl::S_STANDALONE) {
+ if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this())) {
+ // We requested new session for waiter which already incremented
+ // active session counter and old session will be deleted
+ // - skip update active counter in this case
+ sessionImpl->SetNeedUpdateActiveCounter(false);
+ }
+ }
+
if (sessionImpl->NeedUpdateActiveCounter()) {
SessionPool_.DecrementActiveCounter();
}
@@ -2711,8 +2730,8 @@ void TSessionPoolImpl::CreateFakeSession(
std::shared_ptr<TTableClient::TImpl> client)
{
TSession session(client, "", "");
- // Mark broken to prevent returning to session pool
- session.SessionImpl_->MarkBroken();
+ // Mark standalone to prevent returning to session pool
+ session.SessionImpl_->MarkStandalone();
TCreateSessionResult val(
TStatus(
TPlainStatus(
@@ -2734,8 +2753,14 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
{
auto createSessionPromise = NewPromise<TCreateSessionResult>();
std::unique_ptr<TSession::TImpl> sessionImpl;
+ // TODO: Remove this flag
bool needUpdateActiveSessionCounter = false;
- bool returnFakeSession = false;
+ enum class TSessionSource {
+ Pool,
+ Waiter,
+ Error
+ } sessionSource = TSessionSource::Pool;
+
{
std::lock_guard guard(Mtx_);
if (MaxActiveSessions_) {
@@ -2743,7 +2768,12 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
ActiveSessions_++;
needUpdateActiveSessionCounter = true;
} else {
- returnFakeSession = true;
+ if (Waiters_.size() < (MaxActiveSessions_ * 10)) {
+ Waiters_.insert(std::make_pair(TInstant::Now(), createSessionPromise));
+ sessionSource = TSessionSource::Waiter;
+ } else {
+ sessionSource = TSessionSource::Error;
+ }
}
} else {
ActiveSessions_++;
@@ -2756,7 +2786,9 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
}
UpdateStats();
}
- if (returnFakeSession) {
+ if (sessionSource == TSessionSource::Waiter) {
+ return createSessionPromise.GetFuture();
+ } else if (sessionSource == TSessionSource::Error) {
FakeSessionsCounter_.Inc();
CreateFakeSession(createSessionPromise, client);
return createSessionPromise.GetFuture();
@@ -2764,6 +2796,7 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE);
Y_VERIFY(!sessionImpl->GetTimeInterval());
Y_VERIFY(needUpdateActiveSessionCounter);
+ Y_VERIFY(sessionSource == TSessionSource::Pool);
sessionImpl->MarkActive();
sessionImpl->SetNeedUpdateActiveCounter(true);
@@ -2778,8 +2811,9 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
return createSessionPromise.GetFuture();
} else {
+ Y_VERIFY(needUpdateActiveSessionCounter);
const auto& sessionResult = client->CreateSession(settings, false);
- sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, needUpdateActiveSessionCounter));
+ sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true));
return createSessionPromise.GetFuture();
}
}
@@ -2805,19 +2839,80 @@ bool TSessionPoolImpl::DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl
return true;
}
-bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active) {
+bool TSessionPoolImpl::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client) {
+ NThreading::TPromise<TCreateSessionResult> createSessionPromise;
{
std::lock_guard guard(Mtx_);
if (Closed_)
return false;
- Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl));
- if (active) {
- Y_VERIFY(ActiveSessions_);
- ActiveSessions_--;
- impl->SetNeedUpdateActiveCounter(false);
+
+ if (Waiters_.empty())
+ return false;
+
+ auto it = Waiters_.begin();
+ createSessionPromise = it->second;
+ Waiters_.erase(it);
+ }
+
+ // This code can be called from client dtors. It may be unsafe to
+ // execute grpc call directly...
+ client->ScheduleTaskUnsafe([createSessionPromise, client]() mutable {
+ TCreateSessionSettings settings;
+ settings.ClientTimeout(CREATE_SESSION_INTERNAL_TIMEOUT);
+
+ const auto& sessionResult = client->CreateSession(settings, false);
+ sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true));
+ }, TDuration());
+ return true;
+}
+
+bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) {
+ // Do not set promise under the session pool lock
+ NThreading::TPromise<TCreateSessionResult> createSessionPromise;
+ {
+ std::lock_guard guard(Mtx_);
+ if (Closed_)
+ return false;
+
+ if (!Waiters_.empty()) {
+ auto it = Waiters_.begin();
+ createSessionPromise = it->second;
+ Waiters_.erase(it);
+
+ impl->MarkActive();
+
+ // Check session returned from keep-alive task.
+ // See the code below. We did ActiveSessions_--
+ // when returned user session in to the pool so
+ // we must increment it before returning session
+ // to user.
+ if (!active)
+ ActiveSessions_++;
+ impl->SetNeedUpdateActiveCounter(true);
+ } else {
+ Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl));
+
+ if (active) {
+ Y_VERIFY(ActiveSessions_);
+ ActiveSessions_--;
+ impl->SetNeedUpdateActiveCounter(false);
+ }
}
UpdateStats();
}
+
+ if (createSessionPromise.Initialized()) {
+ TCreateSessionResult val(TStatus(TPlainStatus()),
+ TSession(client, std::shared_ptr<TSession::TImpl>(
+ impl,
+ TSession::TImpl::GetSmartDeleter(client))));
+
+ client->ScheduleTaskUnsafe([createSessionPromise, val{std::move(val)}]() mutable {
+ createSessionPromise.SetValue(std::move(val));
+ }, TDuration());
+
+ }
+
return true;
}
@@ -2859,23 +2954,41 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm
sessionsToTouch.reserve(keepAliveBatchSize);
TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete;
sessionsToDelete.reserve(keepAliveBatchSize);
- auto now = TInstant::Now();
+ TVector<NThreading::TPromise<TCreateSessionResult>> waitersToReplyError;
+ waitersToReplyError.reserve(keepAliveBatchSize);
+ const auto now = TInstant::Now();
{
std::lock_guard guard(Mtx_);
- auto& sessions = Sessions_;
+ {
+ auto& sessions = Sessions_;
+
+ auto it = sessions.begin();
+ while (it != sessions.end() && keepAliveBatchSize--) {
+ if (now < it->second->GetTimeToTouchFast())
+ break;
+
+ if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) {
+ sessionsToDelete.emplace_back(std::move(it->second));
+ } else {
+ sessionsToTouch.emplace_back(std::move(it->second));
+ }
+ sessions.erase(it++);
+ }
+ }
- auto it = sessions.begin();
- while (it != sessions.end() && keepAliveBatchSize--) {
- if (now < it->second->GetTimeToTouchFast())
- break;
+ {
+ auto it = Waiters_.begin();
- if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) {
- sessionsToDelete.emplace_back(std::move(it->second));
- } else {
- sessionsToTouch.emplace_back(std::move(it->second));
+ while (it != Waiters_.end()) {
+ if (now < it->first + MAX_WAIT_SESSION_TIMEOUT)
+ break;
+
+ waitersToReplyError.emplace_back(std::move(it->second));
+
+ Waiters_.erase(it++);
}
- sessions.erase(it++);
}
+
UpdateStats();
}
@@ -2895,6 +3008,11 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm
TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient);
}
}
+
+ for (auto& waiter : waitersToReplyError) {
+ FakeSessionsCounter_.Inc();
+ CreateFakeSession(waiter, strongClient);
+ }
}
return true;
@@ -2961,11 +3079,13 @@ void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolS
ActiveSessionsCounter_.Set(statCollector.ActiveSessions);
InPoolSessionsCounter_.Set(statCollector.InPoolSessions);
FakeSessionsCounter_.Set(statCollector.FakeSessions);
+ SessionWaiterCounter_.Set(statCollector.Waiters);
}
void TSessionPoolImpl::UpdateStats() {
ActiveSessionsCounter_.Apply(ActiveSessions_);
InPoolSessionsCounter_.Apply(Sessions_.size());
+ SessionWaiterCounter_.Apply(Waiters_.size());
}
TTableClient::TTableClient(const TDriver& driver, const TClientSettings& settings)