diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-05-22 21:15:42 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-05-22 21:15:42 +0300 |
commit | f9ce855990dfd8d19845e20335f10b6658e77ac2 (patch) | |
tree | ecbd85eeb121ebbfe9cc62b2ca7832445e6f3819 | |
parent | 291d21bd2b1341583b714cd77e77a45821eb2e17 (diff) | |
download | ydb-f9ce855990dfd8d19845e20335f10b6658e77ac2.tar.gz |
Revert commit rXXXXXX due to issue during cluster restart.
5 files changed, 33 insertions, 160 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h index 2665b62aec..e843cda0e0 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h @@ -183,18 +183,13 @@ public: struct TSessionPoolStatCollector { TSessionPoolStatCollector(::NMonitoring::TIntGauge* activeSessions = nullptr , ::NMonitoring::TIntGauge* inPoolSessions = nullptr - , ::NMonitoring::TRate* fakeSessions = nullptr - , ::NMonitoring::TIntGauge* waiters = nullptr) - : ActiveSessions(activeSessions) - , InPoolSessions(inPoolSessions) - , FakeSessions(fakeSessions) - , Waiters(waiters) + , ::NMonitoring::TRate* fakeSessions = nullptr) + : ActiveSessions(activeSessions), InPoolSessions(inPoolSessions), FakeSessions(fakeSessions) { } ::NMonitoring::TIntGauge* ActiveSessions; ::NMonitoring::TIntGauge* InPoolSessions; ::NMonitoring::TRate* FakeSessions; - ::NMonitoring::TIntGauge* Waiters; }; struct TClientRetryOperationStatCollector { @@ -271,7 +266,6 @@ public: CacheMiss_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/ClientQueryCacheMiss"} })); ActiveSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InUse"} })); InPoolSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InPool"} })); - Waiters_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/WaitForReturn"} })); SessionCV_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "SessionBalancer/Variation"} })); SessionRemovedDueBalancing_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/SessionsRemoved"} })); RequestMigrated_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/RequestsMigrated"} })); @@ -354,7 +348,7 @@ public: return TSessionPoolStatCollector(); } - return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get(), Waiters_.Get()); + return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get()); } TClientStatCollector GetClientStatCollector() { @@ -392,7 +386,6 @@ private: TAtomicCounter<::NMonitoring::TRate> DiscoveryFailDueTransportError_; TAtomicPointer<::NMonitoring::TIntGauge> ActiveSessions_; TAtomicPointer<::NMonitoring::TIntGauge> InPoolSessions_; - TAtomicPointer<::NMonitoring::TIntGauge> Waiters_; TAtomicCounter<::NMonitoring::TIntGauge> SessionCV_; TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing_; TAtomicCounter<::NMonitoring::TRate> RequestMigrated_; diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt index fad066f801..9183195ace 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt +++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -2.5.0
\ No newline at end of file +2.4.0
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp index 35f91b69b0..ff1729043a 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp @@ -40,7 +40,7 @@ void TRequestMigrator::SetHost(ui64 nodeId) { CurHost_ = nodeId; } -bool TRequestMigrator::IsOurSession(const TSession::TImpl* session) const { +bool TRequestMigrator::IsOurSession(TSession::TImpl* session) const { if (!CurHost_) return false; @@ -65,7 +65,7 @@ bool TRequestMigrator::Reset() { } } -bool TRequestMigrator::DoCheckAndMigrate(const TSession::TImpl* session) { +bool TRequestMigrator::DoCheckAndMigrate(TSession::TImpl* session) { if (session->GetEndpoint().empty()) return false; diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h index fbe6875b3a..2eb7e1b609 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h @@ -51,13 +51,13 @@ public: // Returns false if session is not suitable or unable to get lock to start migration // Returns true if session is suitable in this case Unlink methos on the session is called // This methos is thread safe. - bool DoCheckAndMigrate(const TSession::TImpl* session); + bool DoCheckAndMigrate(TSession::TImpl* session); // Reset migrator to initiall state if migration was not started and returns true // Returns false if migration was started bool Reset(); private: - bool IsOurSession(const TSession::TImpl* session) const; + bool IsOurSession(TSession::TImpl* session) const; ui64 CurHost_ = 0; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index b3c65e22f8..0040df8e29 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -37,8 +37,6 @@ constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4; constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5); constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds(); -constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool -constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session //////////////////////////////////////////////////////////////////////////////// @@ -1384,10 +1382,7 @@ public: // NOTE: O(n) under session pool lock, should not be used often bool DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, ui64 nodeId); // Returns true if session returned to pool successfully - bool ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client); - // Returns trun if has waiter and scheduled to create new session - // too feed it - bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client); + bool ReturnSession(TSession::TImpl* impl, bool active); TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); i64 GetActiveSessions() const; i64 GetActiveSessionsLimit() const; @@ -1404,14 +1399,11 @@ private: mutable std::mutex Mtx_; TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_; - TMultiMap<TInstant, NThreading::TPromise<TCreateSessionResult>> Waiters_; - bool Closed_; i64 ActiveSessions_; const ui32 MaxActiveSessions_; NSdkStats::TSessionCounter ActiveSessionsCounter_; NSdkStats::TSessionCounter InPoolSessionsCounter_; - NSdkStats::TSessionCounter SessionWaiterCounter_; NSdkStats::TAtomicCounter<::NMonitoring::TRate> FakeSessionsCounter_; }; @@ -1620,7 +1612,6 @@ public: }; auto keepAliveCmd = [](TSession session) { - Y_VERIFY(session.GetId()); const auto sessionPoolSettings = session.Client_->Settings_.SessionPoolSettings_; @@ -2324,7 +2315,7 @@ public: // Also removes NeedUpdateActiveCounter flag sessionImpl->MarkIdle(); sessionImpl->SetTimeInterval(TDuration::Zero()); - if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter, shared_from_this())) { + if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); return false; } @@ -2332,16 +2323,6 @@ public: } void DeleteSession(TSession::TImpl* sessionImpl) { - // Closing S_STANDALONE session should not fire getting new session - if (sessionImpl->GetState() != TSession::TImpl::S_STANDALONE) { - if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this())) { - // We requested new session for waiter which already incremented - // active session counter and old session will be deleted - // - skip update active counter in this case - sessionImpl->SetNeedUpdateActiveCounter(false); - } - } - if (sessionImpl->NeedUpdateActiveCounter()) { SessionPool_.DecrementActiveCounter(); } @@ -2730,8 +2711,8 @@ void TSessionPoolImpl::CreateFakeSession( std::shared_ptr<TTableClient::TImpl> client) { TSession session(client, "", ""); - // Mark standalone to prevent returning to session pool - session.SessionImpl_->MarkStandalone(); + // Mark broken to prevent returning to session pool + session.SessionImpl_->MarkBroken(); TCreateSessionResult val( TStatus( TPlainStatus( @@ -2753,14 +2734,8 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( { auto createSessionPromise = NewPromise<TCreateSessionResult>(); std::unique_ptr<TSession::TImpl> sessionImpl; - // TODO: Remove this flag bool needUpdateActiveSessionCounter = false; - enum class TSessionSource { - Pool, - Waiter, - Error - } sessionSource = TSessionSource::Pool; - + bool returnFakeSession = false; { std::lock_guard guard(Mtx_); if (MaxActiveSessions_) { @@ -2768,12 +2743,7 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( ActiveSessions_++; needUpdateActiveSessionCounter = true; } else { - if (Waiters_.size() < (MaxActiveSessions_ * 10)) { - Waiters_.insert(std::make_pair(TInstant::Now(), createSessionPromise)); - sessionSource = TSessionSource::Waiter; - } else { - sessionSource = TSessionSource::Error; - } + returnFakeSession = true; } } else { ActiveSessions_++; @@ -2786,9 +2756,7 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( } UpdateStats(); } - if (sessionSource == TSessionSource::Waiter) { - return createSessionPromise.GetFuture(); - } else if (sessionSource == TSessionSource::Error) { + if (returnFakeSession) { FakeSessionsCounter_.Inc(); CreateFakeSession(createSessionPromise, client); return createSessionPromise.GetFuture(); @@ -2796,7 +2764,6 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); Y_VERIFY(!sessionImpl->GetTimeInterval()); Y_VERIFY(needUpdateActiveSessionCounter); - Y_VERIFY(sessionSource == TSessionSource::Pool); sessionImpl->MarkActive(); sessionImpl->SetNeedUpdateActiveCounter(true); @@ -2811,9 +2778,8 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( return createSessionPromise.GetFuture(); } else { - Y_VERIFY(needUpdateActiveSessionCounter); const auto& sessionResult = client->CreateSession(settings, false); - sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true)); + sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, needUpdateActiveSessionCounter)); return createSessionPromise.GetFuture(); } } @@ -2839,80 +2805,19 @@ bool TSessionPoolImpl::DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl return true; } -bool TSessionPoolImpl::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client) { - NThreading::TPromise<TCreateSessionResult> createSessionPromise; +bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active) { { std::lock_guard guard(Mtx_); if (Closed_) return false; - - if (Waiters_.empty()) - return false; - - auto it = Waiters_.begin(); - createSessionPromise = it->second; - Waiters_.erase(it); - } - - // This code can be called from client dtors. It may be unsafe to - // execute grpc call directly... - client->ScheduleTaskUnsafe([createSessionPromise, client]() mutable { - TCreateSessionSettings settings; - settings.ClientTimeout(CREATE_SESSION_INTERNAL_TIMEOUT); - - const auto& sessionResult = client->CreateSession(settings, false); - sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true)); - }, TDuration()); - return true; -} - -bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) { - // Do not set promise under the session pool lock - NThreading::TPromise<TCreateSessionResult> createSessionPromise; - { - std::lock_guard guard(Mtx_); - if (Closed_) - return false; - - if (!Waiters_.empty()) { - auto it = Waiters_.begin(); - createSessionPromise = it->second; - Waiters_.erase(it); - - impl->MarkActive(); - - // Check session returned from keep-alive task. - // See the code below. We did ActiveSessions_-- - // when returned user session in to the pool so - // we must increment it before returning session - // to user. - if (!active) - ActiveSessions_++; - impl->SetNeedUpdateActiveCounter(true); - } else { - Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl)); - - if (active) { - Y_VERIFY(ActiveSessions_); - ActiveSessions_--; - impl->SetNeedUpdateActiveCounter(false); - } + Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl)); + if (active) { + Y_VERIFY(ActiveSessions_); + ActiveSessions_--; + impl->SetNeedUpdateActiveCounter(false); } UpdateStats(); } - - if (createSessionPromise.Initialized()) { - TCreateSessionResult val(TStatus(TPlainStatus()), - TSession(client, std::shared_ptr<TSession::TImpl>( - impl, - TSession::TImpl::GetSmartDeleter(client)))); - - client->ScheduleTaskUnsafe([createSessionPromise, val{std::move(val)}]() mutable { - createSessionPromise.SetValue(std::move(val)); - }, TDuration()); - - } - return true; } @@ -2954,41 +2859,23 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm sessionsToTouch.reserve(keepAliveBatchSize); TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete; sessionsToDelete.reserve(keepAliveBatchSize); - TVector<NThreading::TPromise<TCreateSessionResult>> waitersToReplyError; - waitersToReplyError.reserve(keepAliveBatchSize); - const auto now = TInstant::Now(); + auto now = TInstant::Now(); { std::lock_guard guard(Mtx_); - { - auto& sessions = Sessions_; - - auto it = sessions.begin(); - while (it != sessions.end() && keepAliveBatchSize--) { - if (now < it->second->GetTimeToTouchFast()) - break; - - if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) { - sessionsToDelete.emplace_back(std::move(it->second)); - } else { - sessionsToTouch.emplace_back(std::move(it->second)); - } - sessions.erase(it++); - } - } - - { - auto it = Waiters_.begin(); + auto& sessions = Sessions_; - while (it != Waiters_.end()) { - if (now < it->first + MAX_WAIT_SESSION_TIMEOUT) - break; - - waitersToReplyError.emplace_back(std::move(it->second)); + auto it = sessions.begin(); + while (it != sessions.end() && keepAliveBatchSize--) { + if (now < it->second->GetTimeToTouchFast()) + break; - Waiters_.erase(it++); + if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) { + sessionsToDelete.emplace_back(std::move(it->second)); + } else { + sessionsToTouch.emplace_back(std::move(it->second)); } + sessions.erase(it++); } - UpdateStats(); } @@ -3008,11 +2895,6 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient); } } - - for (auto& waiter : waitersToReplyError) { - FakeSessionsCounter_.Inc(); - CreateFakeSession(waiter, strongClient); - } } return true; @@ -3079,13 +2961,11 @@ void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolS ActiveSessionsCounter_.Set(statCollector.ActiveSessions); InPoolSessionsCounter_.Set(statCollector.InPoolSessions); FakeSessionsCounter_.Set(statCollector.FakeSessions); - SessionWaiterCounter_.Set(statCollector.Waiters); } void TSessionPoolImpl::UpdateStats() { ActiveSessionsCounter_.Apply(ActiveSessions_); InPoolSessionsCounter_.Apply(Sessions_.size()); - SessionWaiterCounter_.Apply(Waiters_.size()); } TTableClient::TTableClient(const TDriver& driver, const TClientSettings& settings) |