aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-05-22 21:15:42 +0300
committerdcherednik <dcherednik@ydb.tech>2023-05-22 21:15:42 +0300
commitf9ce855990dfd8d19845e20335f10b6658e77ac2 (patch)
treeecbd85eeb121ebbfe9cc62b2ca7832445e6f3819
parent291d21bd2b1341583b714cd77e77a45821eb2e17 (diff)
downloadydb-f9ce855990dfd8d19845e20335f10b6658e77ac2.tar.gz
Revert commit rXXXXXX due to issue during cluster restart.
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h13
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp170
5 files changed, 33 insertions, 160 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
index 2665b62aec..e843cda0e0 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
@@ -183,18 +183,13 @@ public:
struct TSessionPoolStatCollector {
TSessionPoolStatCollector(::NMonitoring::TIntGauge* activeSessions = nullptr
, ::NMonitoring::TIntGauge* inPoolSessions = nullptr
- , ::NMonitoring::TRate* fakeSessions = nullptr
- , ::NMonitoring::TIntGauge* waiters = nullptr)
- : ActiveSessions(activeSessions)
- , InPoolSessions(inPoolSessions)
- , FakeSessions(fakeSessions)
- , Waiters(waiters)
+ , ::NMonitoring::TRate* fakeSessions = nullptr)
+ : ActiveSessions(activeSessions), InPoolSessions(inPoolSessions), FakeSessions(fakeSessions)
{ }
::NMonitoring::TIntGauge* ActiveSessions;
::NMonitoring::TIntGauge* InPoolSessions;
::NMonitoring::TRate* FakeSessions;
- ::NMonitoring::TIntGauge* Waiters;
};
struct TClientRetryOperationStatCollector {
@@ -271,7 +266,6 @@ public:
CacheMiss_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/ClientQueryCacheMiss"} }));
ActiveSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InUse"} }));
InPoolSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InPool"} }));
- Waiters_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/WaitForReturn"} }));
SessionCV_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "SessionBalancer/Variation"} }));
SessionRemovedDueBalancing_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/SessionsRemoved"} }));
RequestMigrated_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/RequestsMigrated"} }));
@@ -354,7 +348,7 @@ public:
return TSessionPoolStatCollector();
}
- return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get(), Waiters_.Get());
+ return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get());
}
TClientStatCollector GetClientStatCollector() {
@@ -392,7 +386,6 @@ private:
TAtomicCounter<::NMonitoring::TRate> DiscoveryFailDueTransportError_;
TAtomicPointer<::NMonitoring::TIntGauge> ActiveSessions_;
TAtomicPointer<::NMonitoring::TIntGauge> InPoolSessions_;
- TAtomicPointer<::NMonitoring::TIntGauge> Waiters_;
TAtomicCounter<::NMonitoring::TIntGauge> SessionCV_;
TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing_;
TAtomicCounter<::NMonitoring::TRate> RequestMigrated_;
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
index fad066f801..9183195ace 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
+++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
@@ -1 +1 @@
-2.5.0 \ No newline at end of file
+2.4.0 \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
index 35f91b69b0..ff1729043a 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
@@ -40,7 +40,7 @@ void TRequestMigrator::SetHost(ui64 nodeId) {
CurHost_ = nodeId;
}
-bool TRequestMigrator::IsOurSession(const TSession::TImpl* session) const {
+bool TRequestMigrator::IsOurSession(TSession::TImpl* session) const {
if (!CurHost_)
return false;
@@ -65,7 +65,7 @@ bool TRequestMigrator::Reset() {
}
}
-bool TRequestMigrator::DoCheckAndMigrate(const TSession::TImpl* session) {
+bool TRequestMigrator::DoCheckAndMigrate(TSession::TImpl* session) {
if (session->GetEndpoint().empty())
return false;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
index fbe6875b3a..2eb7e1b609 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
@@ -51,13 +51,13 @@ public:
// Returns false if session is not suitable or unable to get lock to start migration
// Returns true if session is suitable in this case Unlink methos on the session is called
// This methos is thread safe.
- bool DoCheckAndMigrate(const TSession::TImpl* session);
+ bool DoCheckAndMigrate(TSession::TImpl* session);
// Reset migrator to initiall state if migration was not started and returns true
// Returns false if migration was started
bool Reset();
private:
- bool IsOurSession(const TSession::TImpl* session) const;
+ bool IsOurSession(TSession::TImpl* session) const;
ui64 CurHost_ = 0;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index b3c65e22f8..0040df8e29 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -37,8 +37,6 @@ constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4;
constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5);
constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval
constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds();
-constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool
-constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session
////////////////////////////////////////////////////////////////////////////////
@@ -1384,10 +1382,7 @@ public:
// NOTE: O(n) under session pool lock, should not be used often
bool DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, ui64 nodeId);
// Returns true if session returned to pool successfully
- bool ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client);
- // Returns trun if has waiter and scheduled to create new session
- // too feed it
- bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client);
+ bool ReturnSession(TSession::TImpl* impl, bool active);
TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate);
i64 GetActiveSessions() const;
i64 GetActiveSessionsLimit() const;
@@ -1404,14 +1399,11 @@ private:
mutable std::mutex Mtx_;
TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_;
- TMultiMap<TInstant, NThreading::TPromise<TCreateSessionResult>> Waiters_;
-
bool Closed_;
i64 ActiveSessions_;
const ui32 MaxActiveSessions_;
NSdkStats::TSessionCounter ActiveSessionsCounter_;
NSdkStats::TSessionCounter InPoolSessionsCounter_;
- NSdkStats::TSessionCounter SessionWaiterCounter_;
NSdkStats::TAtomicCounter<::NMonitoring::TRate> FakeSessionsCounter_;
};
@@ -1620,7 +1612,6 @@ public:
};
auto keepAliveCmd = [](TSession session) {
-
Y_VERIFY(session.GetId());
const auto sessionPoolSettings = session.Client_->Settings_.SessionPoolSettings_;
@@ -2324,7 +2315,7 @@ public:
// Also removes NeedUpdateActiveCounter flag
sessionImpl->MarkIdle();
sessionImpl->SetTimeInterval(TDuration::Zero());
- if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter, shared_from_this())) {
+ if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) {
sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter);
return false;
}
@@ -2332,16 +2323,6 @@ public:
}
void DeleteSession(TSession::TImpl* sessionImpl) {
- // Closing S_STANDALONE session should not fire getting new session
- if (sessionImpl->GetState() != TSession::TImpl::S_STANDALONE) {
- if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this())) {
- // We requested new session for waiter which already incremented
- // active session counter and old session will be deleted
- // - skip update active counter in this case
- sessionImpl->SetNeedUpdateActiveCounter(false);
- }
- }
-
if (sessionImpl->NeedUpdateActiveCounter()) {
SessionPool_.DecrementActiveCounter();
}
@@ -2730,8 +2711,8 @@ void TSessionPoolImpl::CreateFakeSession(
std::shared_ptr<TTableClient::TImpl> client)
{
TSession session(client, "", "");
- // Mark standalone to prevent returning to session pool
- session.SessionImpl_->MarkStandalone();
+ // Mark broken to prevent returning to session pool
+ session.SessionImpl_->MarkBroken();
TCreateSessionResult val(
TStatus(
TPlainStatus(
@@ -2753,14 +2734,8 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
{
auto createSessionPromise = NewPromise<TCreateSessionResult>();
std::unique_ptr<TSession::TImpl> sessionImpl;
- // TODO: Remove this flag
bool needUpdateActiveSessionCounter = false;
- enum class TSessionSource {
- Pool,
- Waiter,
- Error
- } sessionSource = TSessionSource::Pool;
-
+ bool returnFakeSession = false;
{
std::lock_guard guard(Mtx_);
if (MaxActiveSessions_) {
@@ -2768,12 +2743,7 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
ActiveSessions_++;
needUpdateActiveSessionCounter = true;
} else {
- if (Waiters_.size() < (MaxActiveSessions_ * 10)) {
- Waiters_.insert(std::make_pair(TInstant::Now(), createSessionPromise));
- sessionSource = TSessionSource::Waiter;
- } else {
- sessionSource = TSessionSource::Error;
- }
+ returnFakeSession = true;
}
} else {
ActiveSessions_++;
@@ -2786,9 +2756,7 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
}
UpdateStats();
}
- if (sessionSource == TSessionSource::Waiter) {
- return createSessionPromise.GetFuture();
- } else if (sessionSource == TSessionSource::Error) {
+ if (returnFakeSession) {
FakeSessionsCounter_.Inc();
CreateFakeSession(createSessionPromise, client);
return createSessionPromise.GetFuture();
@@ -2796,7 +2764,6 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE);
Y_VERIFY(!sessionImpl->GetTimeInterval());
Y_VERIFY(needUpdateActiveSessionCounter);
- Y_VERIFY(sessionSource == TSessionSource::Pool);
sessionImpl->MarkActive();
sessionImpl->SetNeedUpdateActiveCounter(true);
@@ -2811,9 +2778,8 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
return createSessionPromise.GetFuture();
} else {
- Y_VERIFY(needUpdateActiveSessionCounter);
const auto& sessionResult = client->CreateSession(settings, false);
- sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true));
+ sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, needUpdateActiveSessionCounter));
return createSessionPromise.GetFuture();
}
}
@@ -2839,80 +2805,19 @@ bool TSessionPoolImpl::DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl
return true;
}
-bool TSessionPoolImpl::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client) {
- NThreading::TPromise<TCreateSessionResult> createSessionPromise;
+bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active) {
{
std::lock_guard guard(Mtx_);
if (Closed_)
return false;
-
- if (Waiters_.empty())
- return false;
-
- auto it = Waiters_.begin();
- createSessionPromise = it->second;
- Waiters_.erase(it);
- }
-
- // This code can be called from client dtors. It may be unsafe to
- // execute grpc call directly...
- client->ScheduleTaskUnsafe([createSessionPromise, client]() mutable {
- TCreateSessionSettings settings;
- settings.ClientTimeout(CREATE_SESSION_INTERNAL_TIMEOUT);
-
- const auto& sessionResult = client->CreateSession(settings, false);
- sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true));
- }, TDuration());
- return true;
-}
-
-bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) {
- // Do not set promise under the session pool lock
- NThreading::TPromise<TCreateSessionResult> createSessionPromise;
- {
- std::lock_guard guard(Mtx_);
- if (Closed_)
- return false;
-
- if (!Waiters_.empty()) {
- auto it = Waiters_.begin();
- createSessionPromise = it->second;
- Waiters_.erase(it);
-
- impl->MarkActive();
-
- // Check session returned from keep-alive task.
- // See the code below. We did ActiveSessions_--
- // when returned user session in to the pool so
- // we must increment it before returning session
- // to user.
- if (!active)
- ActiveSessions_++;
- impl->SetNeedUpdateActiveCounter(true);
- } else {
- Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl));
-
- if (active) {
- Y_VERIFY(ActiveSessions_);
- ActiveSessions_--;
- impl->SetNeedUpdateActiveCounter(false);
- }
+ Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl));
+ if (active) {
+ Y_VERIFY(ActiveSessions_);
+ ActiveSessions_--;
+ impl->SetNeedUpdateActiveCounter(false);
}
UpdateStats();
}
-
- if (createSessionPromise.Initialized()) {
- TCreateSessionResult val(TStatus(TPlainStatus()),
- TSession(client, std::shared_ptr<TSession::TImpl>(
- impl,
- TSession::TImpl::GetSmartDeleter(client))));
-
- client->ScheduleTaskUnsafe([createSessionPromise, val{std::move(val)}]() mutable {
- createSessionPromise.SetValue(std::move(val));
- }, TDuration());
-
- }
-
return true;
}
@@ -2954,41 +2859,23 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm
sessionsToTouch.reserve(keepAliveBatchSize);
TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete;
sessionsToDelete.reserve(keepAliveBatchSize);
- TVector<NThreading::TPromise<TCreateSessionResult>> waitersToReplyError;
- waitersToReplyError.reserve(keepAliveBatchSize);
- const auto now = TInstant::Now();
+ auto now = TInstant::Now();
{
std::lock_guard guard(Mtx_);
- {
- auto& sessions = Sessions_;
-
- auto it = sessions.begin();
- while (it != sessions.end() && keepAliveBatchSize--) {
- if (now < it->second->GetTimeToTouchFast())
- break;
-
- if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) {
- sessionsToDelete.emplace_back(std::move(it->second));
- } else {
- sessionsToTouch.emplace_back(std::move(it->second));
- }
- sessions.erase(it++);
- }
- }
-
- {
- auto it = Waiters_.begin();
+ auto& sessions = Sessions_;
- while (it != Waiters_.end()) {
- if (now < it->first + MAX_WAIT_SESSION_TIMEOUT)
- break;
-
- waitersToReplyError.emplace_back(std::move(it->second));
+ auto it = sessions.begin();
+ while (it != sessions.end() && keepAliveBatchSize--) {
+ if (now < it->second->GetTimeToTouchFast())
+ break;
- Waiters_.erase(it++);
+ if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) {
+ sessionsToDelete.emplace_back(std::move(it->second));
+ } else {
+ sessionsToTouch.emplace_back(std::move(it->second));
}
+ sessions.erase(it++);
}
-
UpdateStats();
}
@@ -3008,11 +2895,6 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm
TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient);
}
}
-
- for (auto& waiter : waitersToReplyError) {
- FakeSessionsCounter_.Inc();
- CreateFakeSession(waiter, strongClient);
- }
}
return true;
@@ -3079,13 +2961,11 @@ void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolS
ActiveSessionsCounter_.Set(statCollector.ActiveSessions);
InPoolSessionsCounter_.Set(statCollector.InPoolSessions);
FakeSessionsCounter_.Set(statCollector.FakeSessions);
- SessionWaiterCounter_.Set(statCollector.Waiters);
}
void TSessionPoolImpl::UpdateStats() {
ActiveSessionsCounter_.Apply(ActiveSessions_);
InPoolSessionsCounter_.Apply(Sessions_.size());
- SessionWaiterCounter_.Apply(Waiters_.size());
}
TTableClient::TTableClient(const TDriver& driver, const TClientSettings& settings)