diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-05-17 20:56:19 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-05-17 20:56:19 +0300 |
commit | 06d41c8ae0d06c6b8532cff54b513a4f10e3db0c (patch) | |
tree | 837baa73f3cd3bccd623ec1832800a717a9a1cbb | |
parent | 1ae414d53159cbb50c7bc23d2a6f873a9dae99fa (diff) | |
download | ydb-06d41c8ae0d06c6b8532cff54b513a4f10e3db0c.tar.gz |
Subscribe for sessions in case of active session limit issue.
old behaviour - return error immediately if we reach active session limit.
new behaviour - subscribe for returning sessions. The subscribers queue has limit for 10xActiveSessionLimit. In case of overflow this queue CLIENT_RESOURCE_EXOSTED error returned (just like old behaviour).
5 files changed, 160 insertions, 33 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h index e843cda0e0b..2665b62aecc 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h @@ -183,13 +183,18 @@ public: struct TSessionPoolStatCollector { TSessionPoolStatCollector(::NMonitoring::TIntGauge* activeSessions = nullptr , ::NMonitoring::TIntGauge* inPoolSessions = nullptr - , ::NMonitoring::TRate* fakeSessions = nullptr) - : ActiveSessions(activeSessions), InPoolSessions(inPoolSessions), FakeSessions(fakeSessions) + , ::NMonitoring::TRate* fakeSessions = nullptr + , ::NMonitoring::TIntGauge* waiters = nullptr) + : ActiveSessions(activeSessions) + , InPoolSessions(inPoolSessions) + , FakeSessions(fakeSessions) + , Waiters(waiters) { } ::NMonitoring::TIntGauge* ActiveSessions; ::NMonitoring::TIntGauge* InPoolSessions; ::NMonitoring::TRate* FakeSessions; + ::NMonitoring::TIntGauge* Waiters; }; struct TClientRetryOperationStatCollector { @@ -266,6 +271,7 @@ public: CacheMiss_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/ClientQueryCacheMiss"} })); ActiveSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InUse"} })); InPoolSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InPool"} })); + Waiters_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/WaitForReturn"} })); SessionCV_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "SessionBalancer/Variation"} })); SessionRemovedDueBalancing_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/SessionsRemoved"} })); RequestMigrated_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/RequestsMigrated"} })); @@ -348,7 +354,7 @@ public: return TSessionPoolStatCollector(); } - return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get()); + return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get(), Waiters_.Get()); } TClientStatCollector GetClientStatCollector() { @@ -386,6 +392,7 @@ private: TAtomicCounter<::NMonitoring::TRate> DiscoveryFailDueTransportError_; TAtomicPointer<::NMonitoring::TIntGauge> ActiveSessions_; TAtomicPointer<::NMonitoring::TIntGauge> InPoolSessions_; + TAtomicPointer<::NMonitoring::TIntGauge> Waiters_; TAtomicCounter<::NMonitoring::TIntGauge> SessionCV_; TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing_; TAtomicCounter<::NMonitoring::TRate> RequestMigrated_; diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt index 9183195ace7..fad066f801a 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt +++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -2.4.0
\ No newline at end of file +2.5.0
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp index ff1729043a0..35f91b69b0d 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp @@ -40,7 +40,7 @@ void TRequestMigrator::SetHost(ui64 nodeId) { CurHost_ = nodeId; } -bool TRequestMigrator::IsOurSession(TSession::TImpl* session) const { +bool TRequestMigrator::IsOurSession(const TSession::TImpl* session) const { if (!CurHost_) return false; @@ -65,7 +65,7 @@ bool TRequestMigrator::Reset() { } } -bool TRequestMigrator::DoCheckAndMigrate(TSession::TImpl* session) { +bool TRequestMigrator::DoCheckAndMigrate(const TSession::TImpl* session) { if (session->GetEndpoint().empty()) return false; diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h index 2eb7e1b609f..fbe6875b3aa 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h @@ -51,13 +51,13 @@ public: // Returns false if session is not suitable or unable to get lock to start migration // Returns true if session is suitable in this case Unlink methos on the session is called // This methos is thread safe. - bool DoCheckAndMigrate(TSession::TImpl* session); + bool DoCheckAndMigrate(const TSession::TImpl* session); // Reset migrator to initiall state if migration was not started and returns true // Returns false if migration was started bool Reset(); private: - bool IsOurSession(TSession::TImpl* session) const; + bool IsOurSession(const TSession::TImpl* session) const; ui64 CurHost_ = 0; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 34e708203fc..ed14768f040 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -37,6 +37,8 @@ constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4; constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5); constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds(); +constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool +constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session //////////////////////////////////////////////////////////////////////////////// @@ -1382,7 +1384,10 @@ public: // NOTE: O(n) under session pool lock, should not be used often bool DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, ui64 nodeId); // Returns true if session returned to pool successfully - bool ReturnSession(TSession::TImpl* impl, bool active); + bool ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client); + // Returns trun if has waiter and scheduled to create new session + // too feed it + bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client); TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); i64 GetActiveSessions() const; i64 GetActiveSessionsLimit() const; @@ -1399,11 +1404,14 @@ private: mutable std::mutex Mtx_; TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_; + TMultiMap<TInstant, NThreading::TPromise<TCreateSessionResult>> Waiters_; + bool Closed_; i64 ActiveSessions_; const ui32 MaxActiveSessions_; NSdkStats::TSessionCounter ActiveSessionsCounter_; NSdkStats::TSessionCounter InPoolSessionsCounter_; + NSdkStats::TSessionCounter SessionWaiterCounter_; NSdkStats::TAtomicCounter<::NMonitoring::TRate> FakeSessionsCounter_; }; @@ -1612,6 +1620,7 @@ public: }; auto keepAliveCmd = [](TSession session) { + Y_VERIFY(session.GetId()); const auto sessionPoolSettings = session.Client_->Settings_.SessionPoolSettings_; @@ -2315,7 +2324,7 @@ public: // Also removes NeedUpdateActiveCounter flag sessionImpl->MarkIdle(); sessionImpl->SetTimeInterval(TDuration::Zero()); - if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { + if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter, shared_from_this())) { sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); return false; } @@ -2323,6 +2332,16 @@ public: } void DeleteSession(TSession::TImpl* sessionImpl) { + // Closing S_STANDALONE session should not fire getting new session + if (sessionImpl->GetState() != TSession::TImpl::S_STANDALONE) { + if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this())) { + // We requested new session for waiter which already incremented + // active session counter and old session will be deleted + // - skip update active counter in this case + sessionImpl->SetNeedUpdateActiveCounter(false); + } + } + if (sessionImpl->NeedUpdateActiveCounter()) { SessionPool_.DecrementActiveCounter(); } @@ -2711,8 +2730,8 @@ void TSessionPoolImpl::CreateFakeSession( std::shared_ptr<TTableClient::TImpl> client) { TSession session(client, "", ""); - // Mark broken to prevent returning to session pool - session.SessionImpl_->MarkBroken(); + // Mark standalone to prevent returning to session pool + session.SessionImpl_->MarkStandalone(); TCreateSessionResult val( TStatus( TPlainStatus( @@ -2734,8 +2753,14 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( { auto createSessionPromise = NewPromise<TCreateSessionResult>(); std::unique_ptr<TSession::TImpl> sessionImpl; + // TODO: Remove this flag bool needUpdateActiveSessionCounter = false; - bool returnFakeSession = false; + enum class TSessionSource { + Pool, + Waiter, + Error + } sessionSource = TSessionSource::Pool; + { std::lock_guard guard(Mtx_); if (MaxActiveSessions_) { @@ -2743,7 +2768,12 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( ActiveSessions_++; needUpdateActiveSessionCounter = true; } else { - returnFakeSession = true; + if (Waiters_.size() < (MaxActiveSessions_ * 10)) { + Waiters_.insert(std::make_pair(TInstant::Now(), createSessionPromise)); + sessionSource = TSessionSource::Waiter; + } else { + sessionSource = TSessionSource::Error; + } } } else { ActiveSessions_++; @@ -2756,7 +2786,9 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( } UpdateStats(); } - if (returnFakeSession) { + if (sessionSource == TSessionSource::Waiter) { + return createSessionPromise.GetFuture(); + } else if (sessionSource == TSessionSource::Error) { FakeSessionsCounter_.Inc(); CreateFakeSession(createSessionPromise, client); return createSessionPromise.GetFuture(); @@ -2764,6 +2796,7 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); Y_VERIFY(!sessionImpl->GetTimeInterval()); Y_VERIFY(needUpdateActiveSessionCounter); + Y_VERIFY(sessionSource == TSessionSource::Pool); sessionImpl->MarkActive(); sessionImpl->SetNeedUpdateActiveCounter(true); @@ -2778,8 +2811,9 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( return createSessionPromise.GetFuture(); } else { + Y_VERIFY(needUpdateActiveSessionCounter); const auto& sessionResult = client->CreateSession(settings, false); - sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, needUpdateActiveSessionCounter)); + sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true)); return createSessionPromise.GetFuture(); } } @@ -2805,19 +2839,80 @@ bool TSessionPoolImpl::DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl return true; } -bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active) { +bool TSessionPoolImpl::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client) { + NThreading::TPromise<TCreateSessionResult> createSessionPromise; { std::lock_guard guard(Mtx_); if (Closed_) return false; - Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl)); - if (active) { - Y_VERIFY(ActiveSessions_); - ActiveSessions_--; - impl->SetNeedUpdateActiveCounter(false); + + if (Waiters_.empty()) + return false; + + auto it = Waiters_.begin(); + createSessionPromise = it->second; + Waiters_.erase(it); + } + + // This code can be called from client dtors. It may be unsafe to + // execute grpc call directly... + client->ScheduleTaskUnsafe([createSessionPromise, client]() mutable { + TCreateSessionSettings settings; + settings.ClientTimeout(CREATE_SESSION_INTERNAL_TIMEOUT); + + const auto& sessionResult = client->CreateSession(settings, false); + sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true)); + }, TDuration()); + return true; +} + +bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) { + // Do not set promise under the session pool lock + NThreading::TPromise<TCreateSessionResult> createSessionPromise; + { + std::lock_guard guard(Mtx_); + if (Closed_) + return false; + + if (!Waiters_.empty()) { + auto it = Waiters_.begin(); + createSessionPromise = it->second; + Waiters_.erase(it); + + impl->MarkActive(); + + // Check session returned from keep-alive task. + // See the code below. We did ActiveSessions_-- + // when returned user session in to the pool so + // we must increment it before returning session + // to user. + if (!active) + ActiveSessions_++; + impl->SetNeedUpdateActiveCounter(true); + } else { + Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl)); + + if (active) { + Y_VERIFY(ActiveSessions_); + ActiveSessions_--; + impl->SetNeedUpdateActiveCounter(false); + } } UpdateStats(); } + + if (createSessionPromise.Initialized()) { + TCreateSessionResult val(TStatus(TPlainStatus()), + TSession(client, std::shared_ptr<TSession::TImpl>( + impl, + TSession::TImpl::GetSmartDeleter(client)))); + + client->ScheduleTaskUnsafe([createSessionPromise, val{std::move(val)}]() mutable { + createSessionPromise.SetValue(std::move(val)); + }, TDuration()); + + } + return true; } @@ -2859,23 +2954,41 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm sessionsToTouch.reserve(keepAliveBatchSize); TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete; sessionsToDelete.reserve(keepAliveBatchSize); - auto now = TInstant::Now(); + TVector<NThreading::TPromise<TCreateSessionResult>> waitersToReplyError; + waitersToReplyError.reserve(keepAliveBatchSize); + const auto now = TInstant::Now(); { std::lock_guard guard(Mtx_); - auto& sessions = Sessions_; + { + auto& sessions = Sessions_; + + auto it = sessions.begin(); + while (it != sessions.end() && keepAliveBatchSize--) { + if (now < it->second->GetTimeToTouchFast()) + break; + + if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) { + sessionsToDelete.emplace_back(std::move(it->second)); + } else { + sessionsToTouch.emplace_back(std::move(it->second)); + } + sessions.erase(it++); + } + } - auto it = sessions.begin(); - while (it != sessions.end() && keepAliveBatchSize--) { - if (now < it->second->GetTimeToTouchFast()) - break; + { + auto it = Waiters_.begin(); - if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) { - sessionsToDelete.emplace_back(std::move(it->second)); - } else { - sessionsToTouch.emplace_back(std::move(it->second)); + while (it != Waiters_.end()) { + if (now < it->first + MAX_WAIT_SESSION_TIMEOUT) + break; + + waitersToReplyError.emplace_back(std::move(it->second)); + + Waiters_.erase(it++); } - sessions.erase(it++); } + UpdateStats(); } @@ -2895,6 +3008,11 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient); } } + + for (auto& waiter : waitersToReplyError) { + FakeSessionsCounter_.Inc(); + CreateFakeSession(waiter, strongClient); + } } return true; @@ -2961,11 +3079,13 @@ void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolS ActiveSessionsCounter_.Set(statCollector.ActiveSessions); InPoolSessionsCounter_.Set(statCollector.InPoolSessions); FakeSessionsCounter_.Set(statCollector.FakeSessions); + SessionWaiterCounter_.Set(statCollector.Waiters); } void TSessionPoolImpl::UpdateStats() { ActiveSessionsCounter_.Apply(ActiveSessions_); InPoolSessionsCounter_.Apply(Sessions_.size()); + SessionWaiterCounter_.Apply(Waiters_.size()); } TTableClient::TTableClient(const TDriver& driver, const TClientSettings& settings) |