diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-02-06 22:32:35 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-02-06 22:32:35 +0300 |
commit | f407bf8f996733b3df766c4db277fa3cdb40e2a1 (patch) | |
tree | fb0e0bb42b338d95e57b9ce69b232769b9834b38 | |
parent | 6cabb77676668397e5ac0afacc8c68b54c41c86b (diff) | |
download | ydb-f407bf8f996733b3df766c4db277fa3cdb40e2a1.tar.gz |
Remove settler from c++ sdk.
6 files changed, 23 insertions, 215 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h index a6222a656ee..e843cda0e0b 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h @@ -181,12 +181,6 @@ public: }; struct TSessionPoolStatCollector { - - enum class EStatCollectorType: size_t { - SESSIONPOOL, - SETTLERPOOL - }; - TSessionPoolStatCollector(::NMonitoring::TIntGauge* activeSessions = nullptr , ::NMonitoring::TIntGauge* inPoolSessions = nullptr , ::NMonitoring::TRate* fakeSessions = nullptr) @@ -272,7 +266,6 @@ public: CacheMiss_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/ClientQueryCacheMiss"} })); ActiveSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InUse"} })); InPoolSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InPool"} })); - SettlerSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InSettler"} })); SessionCV_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "SessionBalancer/Variation"} })); SessionRemovedDueBalancing_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/SessionsRemoved"} })); RequestMigrated_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/RequestsMigrated"} })); @@ -350,20 +343,12 @@ public: } } - TSessionPoolStatCollector GetSessionPoolStatCollector(TSessionPoolStatCollector::EStatCollectorType type) { + TSessionPoolStatCollector GetSessionPoolStatCollector() { if (!IsCollecting()) { return TSessionPoolStatCollector(); } - switch (type) { - case TSessionPoolStatCollector::EStatCollectorType::SESSIONPOOL: - return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get()); - - case TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL: - return TSessionPoolStatCollector(nullptr, SettlerSessions_.Get(), nullptr); - } - - return TSessionPoolStatCollector(); + return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get()); } TClientStatCollector GetClientStatCollector() { @@ -401,7 +386,6 @@ private: TAtomicCounter<::NMonitoring::TRate> DiscoveryFailDueTransportError_; TAtomicPointer<::NMonitoring::TIntGauge> ActiveSessions_; TAtomicPointer<::NMonitoring::TIntGauge> InPoolSessions_; - TAtomicPointer<::NMonitoring::TIntGauge> SettlerSessions_; TAtomicCounter<::NMonitoring::TIntGauge> SessionCV_; TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing_; TAtomicCounter<::NMonitoring::TRate> RequestMigrated_; diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt index cc6612c36e0..9183195ace7 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt +++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -2.3.0
\ No newline at end of file +2.4.0
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp index 289e7a8bb74..7cc29254802 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp @@ -89,16 +89,6 @@ void TSession::TImpl::MarkIdle() { NeedUpdateActiveCounter_ = false; } -// Can be called from interceptor, need lock -void TSession::TImpl::MarkDisconnected() { - with_lock(Lock_) { - if (State_ == EState::S_ACTIVE) { - NeedUpdateActiveCounter_ = true; - } - State_ = EState::S_DISCONNECTED; - } -} - TSession::TImpl::EState TSession::TImpl::GetState() const { // See comments in InjectSessionStatusInterception about lock with_lock(Lock_) { diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h index 1e0a47db661..753016e25e7 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h @@ -34,7 +34,6 @@ public: S_IDLE, S_BROKEN, S_ACTIVE, - S_DISCONNECTED, S_CLOSING }; @@ -60,7 +59,6 @@ public: void MarkStandalone(); void MarkActive(); void MarkIdle(); - void MarkDisconnected(); EState GetState() const; void SetNeedUpdateActiveCounter(bool flag); bool NeedUpdateActiveCounter() const; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 8797a3332c2..0cf6b6ba9c0 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -31,8 +31,6 @@ using namespace NThreading; //How often run session pool keep alive check constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5); -//How often run settler keep alive check -constexpr TDuration SETTLER_PERIODIC_ACTION_INTERVAL = TDuration::Seconds(1); //How ofter run host scan to perform session balancing constexpr TDuration HOSTSCAN_PERIODIC_ACTION_INTERVAL = TDuration::Seconds(2); constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4; @@ -1372,8 +1370,7 @@ public: // this is used for additional total session count limitation TAsyncCreateSessionResult GetSession( std::shared_ptr<TTableClient::TImpl> client, - const TCreateSessionSettings& settings, - TAwareSessonProvider sessionProvider); + const TCreateSessionSettings& settings); // Returns true if session was extracted from session pool and dropped via smart deleter // Returns false if session for given endpoint was not found // NOTE: O(n) under session pool lock, should not be used often @@ -1465,10 +1462,9 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception( // since we have guarantee this request wasn't been started to execute. if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { - // Mark disconnected - the session will be returnet to settler - impl->MarkDisconnected(); + impl->MarkBroken(); } else if (status.GetStatus() == EStatus::SESSION_BUSY) { - impl->MarkDisconnected(); + impl->MarkBroken(); } else if (status.GetStatus() == EStatus::BAD_SESSION) { impl->MarkBroken(); } else if (IsSessionCloseRequested(status)) { @@ -1516,19 +1512,13 @@ public: : TClientImplCommon(std::move(connections), settings) , Settings_(settings) , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_) - , SettlerPool_(0) { if (!DbDriverState_->StatCollector.IsCollecting()) { return; } SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector()); - SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector( - NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SESSIONPOOL - )); - SettlerPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector( - NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL - )); + SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector()); } ~TImpl() { @@ -1565,7 +1555,6 @@ public: return true; }; SessionPool_.Drain(drainer, true); - SettlerPool_.Drain(drainer, true); TVector<TAsyncStatus> closeResults; for (auto& s : sessions) { if (s->GetId()) { @@ -1668,86 +1657,6 @@ public: ), PERIODIC_ACTION_INTERVAL); } - static void DoSettlerKeepAlive(TSession&& session) { - auto curThreshold = session.SessionImpl_->GetTimeInterval(); - if (!curThreshold) { - session.SessionImpl_->SetTimeInterval(TDuration::Seconds(1)); - } else { - session.SessionImpl_->SetTimeInterval(TDuration::FromValue(curThreshold.GetValue() * 2)); - } - - // Bypass InjectSessionStatusInterception to simplify logic - std::shared_ptr<TTableClient::TImpl> clientImpl = session.Client_; - clientImpl->KeepAlive(session.SessionImpl_.get(), KeepAliveSettings) - .Subscribe([session = std::move(session)](TAsyncKeepAliveResult asyncResult) { - - auto result = asyncResult.ExtractValue(); - Y_VERIFY(session.SessionImpl_->GetState() == TSession::TImpl::EState::S_DISCONNECTED); - - if (!result.IsTransportError()) { - if (result.IsSuccess()) { - switch (result.GetSessionStatus()) { - case ESessionStatus::Ready: - // Ready state, mark Idle to return to session pool after dtor call - session.SessionImpl_->MarkIdle(); - session.SessionImpl_->SetTimeInterval(TDuration::Zero()); - session.SessionImpl_->ScheduleTimeToTouchFast( - RandomizeThreshold( - GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_) - ), - true - ); - break; - case ESessionStatus::Unspecified: - // Old server, we have no information about server status - close it - session.SessionImpl_->MarkBroken(); - break; - case ESessionStatus::Busy: - // Session should still be in S_DISCONNECTED state - break; - default: - Y_VERIFY(false, "unimplemented"); - } - } else { - session.SessionImpl_->MarkBroken(); - } - } - if (session.SessionImpl_->GetState() == TSession::TImpl::EState::S_DISCONNECTED) { - session.SessionImpl_->ScheduleTimeToTouchFast( - RandomizeThreshold(session.SessionImpl_->GetTimeInterval()), - false - ); - } - }); - } - - void StartPeriodicSettlerTask() { - - auto deletePredicate = [](TSession::TImpl* , TTableClient::TImpl* , size_t) { - return false; - }; - - auto ttl = Settings_.SettlerSessionPoolTTL_; - auto keepAliveCmd = [ttl](TSession session) { - Y_VERIFY(session.GetId()); - Y_VERIFY(!session.SessionImpl_->NeedUpdateActiveCounter()); - // Here we can change session associated variables without additional locking - - if ((session.SessionImpl_->GetTimeInterval().Seconds() * 2 - 1) < ttl) { - DoSettlerKeepAlive(std::move(session)); - } else { - session.SessionImpl_->MarkBroken(); - } - }; - std::weak_ptr<TTableClient::TImpl> weak = shared_from_this(); - Connections_->AddPeriodicTask( - SettlerPool_.CreatePeriodicTask( - weak, - std::move(keepAliveCmd), - std::move(deletePredicate) - ), SETTLER_PERIODIC_ACTION_INTERVAL); - } - static ui64 ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client) { size_t max = 0; ui64 result = 0; @@ -1876,7 +1785,7 @@ public: } TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings) { - return SessionPool_.GetSession(shared_from_this(), settings, &SettlerAwareSessonProvider); + return SessionPool_.GetSession(shared_from_this(), settings); } i64 GetActiveSessionCount() const { @@ -2388,50 +2297,21 @@ public: } bool ReturnSession(TSession::TImpl* sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE|| - sessionImpl->GetState() == TSession::TImpl::S_DISCONNECTED || + Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE || sessionImpl->GetState() == TSession::TImpl::S_IDLE); if (RequestMigrator_.DoCheckAndMigrate(sessionImpl)) { SessionRemovedDueBalancing.Inc(); return false; } - if (sessionImpl->GetState() == TSession::TImpl::S_ACTIVE || - sessionImpl->GetState() == TSession::TImpl::S_IDLE) { - bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter(); - // Also removes NeedUpdateActiveCounter flag - sessionImpl->MarkIdle(); - sessionImpl->SetTimeInterval(TDuration::Zero()); - if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { - sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); - return false; - } - } else { - if (sessionImpl->NeedUpdateActiveCounter()) { - SessionPool_.DecrementActiveCounter(); - sessionImpl->SetNeedUpdateActiveCounter(false); - } - if (!sessionImpl->GetTimeInterval()) { - // Settler keep-alive task set time interval to non zero value always - // this value is zero if session in session pool or active state. - // So, at _this_ point we know session was returned _not_ from settler keep-alive task: - // - we can just run keep-alive task here - - // To start time counting from scratch - sessionImpl->ScheduleTimeToTouchFast(TDuration::Zero(), true); - auto client = shared_from_this(); - TSession session(client, - std::shared_ptr<TSession::TImpl>( - sessionImpl, - TSession::TImpl::GetSmartDeleter(client))); - DoSettlerKeepAlive(std::move(session)); - } else { - // Session returns from settler keep-alive task - if (!SettlerPool_.ReturnSession(sessionImpl, false)) { - return false; - } - } + bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter(); + // Also removes NeedUpdateActiveCounter flag + sessionImpl->MarkIdle(); + sessionImpl->SetTimeInterval(TDuration::Zero()); + if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { + sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); + return false; } return true; } @@ -2791,10 +2671,6 @@ private: return queryData.GetText(); } - static TAsyncCreateSessionResult SettlerAwareSessonProvider( - std::shared_ptr<TTableClient::TImpl> client, - const TCreateSessionSettings& settings); - public: NSdkStats::TAtomicCounter<::NMonitoring::TRate> CacheMissCounter; NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector; @@ -2805,7 +2681,6 @@ public: private: TSessionPoolImpl SessionPool_; - TSessionPoolImpl SettlerPool_; TRequestMigrator RequestMigrator_; static const TKeepAliveSettings KeepAliveSettings; }; @@ -2827,36 +2702,6 @@ void TTableClient::TImpl::CloseAndDeleteSession(std::unique_ptr<TSession::TImpl> deleteSession->MarkBroken(); } -TAsyncCreateSessionResult TTableClient::TImpl::SettlerAwareSessonProvider( - std::shared_ptr<TTableClient::TImpl> client, - const TCreateSessionSettings& settings) -{ - if (client->SessionPool_.GetActiveSessionsLimit() == 0) { - return client->CreateSession(settings, false); - } - - auto sessionsInSettler = client->SettlerPool_.GetCurrentPoolSize(); - // The purpose is just protect server from huge number of sessions in case - // of transport errors, this check is a bit inaccurate - - // The alternative way is to close sessions in settler during periodic task, - // but it can cause additional latency to close. - - if (sessionsInSettler > client->SessionPool_.GetActiveSessionsLimit()) { - std::unique_ptr<TSession::TImpl> session; - auto drainer = [&session](std::unique_ptr<TSession::TImpl>&& impl) mutable { - session = std::move(impl); - return false; - }; - client->SettlerPool_.Drain(drainer, false); - if (session) { - CloseAndDeleteSession(std::move(session), client); - } - } - - return client->CreateSession(settings, false); -} - void TSessionPoolImpl::CreateFakeSession( NThreading::TPromise<TCreateSessionResult>& promise, std::shared_ptr<TTableClient::TImpl> client) @@ -2881,8 +2726,7 @@ void TSessionPoolImpl::CreateFakeSession( TAsyncCreateSessionResult TSessionPoolImpl::GetSession( std::shared_ptr<TTableClient::TImpl> client, - const TCreateSessionSettings& settings, - TAwareSessonProvider sessionProvider) + const TCreateSessionSettings& settings) { auto createSessionPromise = NewPromise<TCreateSessionResult>(); std::unique_ptr<TSession::TImpl> sessionImpl; @@ -2930,7 +2774,7 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( return createSessionPromise.GetFuture(); } else { - const auto& sessionResult = sessionProvider(client, settings); + const auto& sessionResult = client->CreateSession(settings, false); sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, needUpdateActiveSessionCounter)); return createSessionPromise.GetFuture(); } @@ -3033,8 +2877,7 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm for (auto& sessionImpl : sessionsToTouch) { if (sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE || - sessionImpl->GetState() == TSession::TImpl::S_DISCONNECTED); + Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); TSession session(strongClient, std::shared_ptr<TSession::TImpl>( sessionImpl.release(), TSession::TImpl::GetSmartDeleter(strongClient))); @@ -3044,8 +2887,7 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm for (auto& sessionImpl : sessionsToDelete) { if (sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE || - sessionImpl->GetState() == TSession::TImpl::S_DISCONNECTED); + Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient); } } @@ -3125,7 +2967,6 @@ void TSessionPoolImpl::UpdateStats() { TTableClient::TTableClient(const TDriver& driver, const TClientSettings& settings) : Impl_(new TImpl(CreateInternalInterface(driver), settings)) { Impl_->StartPeriodicSessionPoolTask(); - Impl_->StartPeriodicSettlerTask(); Impl_->StartPeriodicHostScanTask(); Impl_->InitStopper(); } @@ -4197,8 +4038,7 @@ std::function<void(TSession::TImpl*)> TSession::TImpl::GetSmartDeleter(std::shar client->DeleteSession(sessionImpl); break; case TSession::TImpl::S_IDLE: - case TSession::TImpl::S_ACTIVE: - case TSession::TImpl::S_DISCONNECTED: { + case TSession::TImpl::S_ACTIVE: { if (!client->ReturnSession(sessionImpl)) { client->DeleteSession(sessionImpl); } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index dfc62d09e37..8896eebc736 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -930,12 +930,8 @@ struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> { // Allow migrate requests between session during session balancing FLUENT_SETTING_DEFAULT(bool, AllowRequestMigration, true); - // Max number of seconds to keep session in SettlerSessionPool - // Settler pool is a session pool is used to keep - // sessions in case of errors when we dont known actual server - // status (e.g. CLIENT_RESOURCE_EXHAUSTED). In this case - // sessions from this pool we be keep alived with progressive timeout - // to try to return it to main session pool + // This feature has been removed as inefficient + // the settings will be removed in next releases FLUENT_SETTING_DEFAULT(ui32, SettlerSessionPoolTTL, 100); // Settings of session pool FLUENT_SETTING(TSessionPoolSettings, SessionPoolSettings); |