diff options
author | robdrynkin <robdrynkin@yandex-team.com> | 2023-06-12 10:52:35 +0300 |
---|---|---|
committer | robdrynkin <robdrynkin@yandex-team.com> | 2023-06-12 10:52:35 +0300 |
commit | d4d1325a138f2bea1fbb4aa9ef8266382f7a09d1 (patch) | |
tree | 325e185ce471e523420dff780e94ac739269ef0f | |
parent | 90abfcac34c50a1d082fd4e6f6e237a4b56d31c5 (diff) | |
download | ydb-d4d1325a138f2bea1fbb4aa9ef8266382f7a09d1.tar.gz |
Add waiters queue + refactoring
28 files changed, 2947 insertions, 1740 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h index e843cda0e0..2665b62aec 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h @@ -183,13 +183,18 @@ public: struct TSessionPoolStatCollector { TSessionPoolStatCollector(::NMonitoring::TIntGauge* activeSessions = nullptr , ::NMonitoring::TIntGauge* inPoolSessions = nullptr - , ::NMonitoring::TRate* fakeSessions = nullptr) - : ActiveSessions(activeSessions), InPoolSessions(inPoolSessions), FakeSessions(fakeSessions) + , ::NMonitoring::TRate* fakeSessions = nullptr + , ::NMonitoring::TIntGauge* waiters = nullptr) + : ActiveSessions(activeSessions) + , InPoolSessions(inPoolSessions) + , FakeSessions(fakeSessions) + , Waiters(waiters) { } ::NMonitoring::TIntGauge* ActiveSessions; ::NMonitoring::TIntGauge* InPoolSessions; ::NMonitoring::TRate* FakeSessions; + ::NMonitoring::TIntGauge* Waiters; }; struct TClientRetryOperationStatCollector { @@ -266,6 +271,7 @@ public: CacheMiss_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/ClientQueryCacheMiss"} })); ActiveSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InUse"} })); InPoolSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InPool"} })); + Waiters_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/WaitForReturn"} })); SessionCV_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "SessionBalancer/Variation"} })); SessionRemovedDueBalancing_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/SessionsRemoved"} })); RequestMigrated_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/RequestsMigrated"} })); @@ -348,7 +354,7 @@ public: return TSessionPoolStatCollector(); } - return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get()); + return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get(), Waiters_.Get()); } TClientStatCollector GetClientStatCollector() { @@ -386,6 +392,7 @@ private: TAtomicCounter<::NMonitoring::TRate> DiscoveryFailDueTransportError_; TAtomicPointer<::NMonitoring::TIntGauge> ActiveSessions_; TAtomicPointer<::NMonitoring::TIntGauge> InPoolSessions_; + TAtomicPointer<::NMonitoring::TIntGauge> Waiters_; TAtomicCounter<::NMonitoring::TIntGauge> SessionCV_; TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing_; TAtomicCounter<::NMonitoring::TRate> RequestMigrated_; diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt index 9183195ace..fad066f801 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt +++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -2.4.0
\ No newline at end of file +2.5.0
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.darwin-x86_64.txt index 2569e1c3f0..a82f241d2c 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.darwin-x86_64.txt @@ -21,5 +21,8 @@ target_link_libraries(client-ydb_table-impl PUBLIC target_sources(client-ydb_table-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/data_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-aarch64.txt index e56b888c90..b3b691a276 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-aarch64.txt @@ -22,5 +22,8 @@ target_link_libraries(client-ydb_table-impl PUBLIC target_sources(client-ydb_table-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/data_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-x86_64.txt index e56b888c90..b3b691a276 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-x86_64.txt @@ -22,5 +22,8 @@ target_link_libraries(client-ydb_table-impl PUBLIC target_sources(client-ydb_table-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/data_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.windows-x86_64.txt index 2569e1c3f0..a82f241d2c 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.windows-x86_64.txt @@ -21,5 +21,8 @@ target_link_libraries(client-ydb_table-impl PUBLIC target_sources(client-ydb_table-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/data_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp index 7cc2925480..a15d9d9fa1 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp @@ -26,7 +26,7 @@ ui64 GetNodeIdFromSession(const TString& sessionId) { return 0; } -TSession::TImpl::TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize) +TSession::TImpl::TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize, bool isOwnedBySessionPool) : SessionId_(sessionId) , EndpointKey_(endpoint, GetNodeIdFromSession(sessionId)) , State_(S_STANDALONE) @@ -36,6 +36,7 @@ TSession::TImpl::TImpl(const TString& sessionId, const TString& endpoint, bool u , TimeToTouch_(TInstant::Now()) , TimeInPast_(TInstant::Now()) , NeedUpdateActiveCounter_(false) + , IsOwnedBySessionPool_(isOwnedBySessionPool) {} TSession::TImpl::~TImpl() { @@ -89,6 +90,10 @@ void TSession::TImpl::MarkIdle() { NeedUpdateActiveCounter_ = false; } +bool TSession::TImpl::IsOwnedBySessionPool() const { + return IsOwnedBySessionPool_; +} + TSession::TImpl::EState TSession::TImpl::GetState() const { // See comments in InjectSessionStatusInterception about lock with_lock(Lock_) { diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h index 753016e25e..9fb0330fe4 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h @@ -27,7 +27,7 @@ class TSession::TImpl : public TEndpointObj { #ifdef YDB_IMPL_TABLE_CLIENT_SESSION_UT public: #endif - TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize); + TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize, bool isOwnedBySessionPool); public: enum EState { S_STANDALONE, @@ -59,6 +59,7 @@ public: void MarkStandalone(); void MarkActive(); void MarkIdle(); + bool IsOwnedBySessionPool() const; EState GetState() const; void SetNeedUpdateActiveCounter(bool flag); bool NeedUpdateActiveCounter() const; @@ -100,6 +101,7 @@ private: // TODO: suboptimal because need lock for atomic change from interceptor // Rewrite with bit field bool NeedUpdateActiveCounter_; + const bool IsOwnedBySessionPool_; }; } // namespace NTable diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp new file mode 100644 index 0000000000..721337dbde --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp @@ -0,0 +1,103 @@ +#include "readers.h" + +#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> + + +namespace NYdb { +namespace NTable { + +using namespace NThreading; + + +TTablePartIterator::TReaderImpl::TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint) + : StreamProcessor_(streamProcessor) + , Finished_(false) + , Endpoint_(endpoint) +{} + +TTablePartIterator::TReaderImpl::~TReaderImpl() { + StreamProcessor_->Cancel(); +} + +bool TTablePartIterator::TReaderImpl::IsFinished() { + return Finished_; +} + +TAsyncSimpleStreamPart<TResultSet> TTablePartIterator::TReaderImpl::ReadNext(std::shared_ptr<TSelf> self) { + auto promise = NThreading::NewPromise<TSimpleStreamPart<TResultSet>>(); + // Capture self - guarantee no dtor call during the read + auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable { + std::optional<TReadTableSnapshot> snapshot; + if (self->Response_.has_snapshot()) { + snapshot.emplace( + self->Response_.snapshot().plan_step(), + self->Response_.snapshot().tx_id()); + } + if (!grpcStatus.Ok()) { + self->Finished_ = true; + promise.SetValue({TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), + TStatus(TPlainStatus(grpcStatus, self->Endpoint_)), + snapshot}); + } else { + NYql::TIssues issues; + NYql::IssuesFromMessage(self->Response_.issues(), issues); + EStatus clientStatus = static_cast<EStatus>(self->Response_.status()); + promise.SetValue({TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), + TStatus(clientStatus, std::move(issues)), + snapshot}); + } + }; + StreamProcessor_->Read(&Response_, readCb); + return promise.GetFuture(); +} + + + +TScanQueryPartIterator::TReaderImpl::TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint) + : StreamProcessor_(streamProcessor) + , Finished_(false) + , Endpoint_(endpoint) +{} + +TScanQueryPartIterator::TReaderImpl::~TReaderImpl() { + StreamProcessor_->Cancel(); +} + +bool TScanQueryPartIterator::TReaderImpl::IsFinished() const { + return Finished_; +} + +TAsyncScanQueryPart TScanQueryPartIterator::TReaderImpl::ReadNext(std::shared_ptr<TSelf> self) { + auto promise = NThreading::NewPromise<TScanQueryPart>(); + // Capture self - guarantee no dtor call during the read + auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable { + if (!grpcStatus.Ok()) { + self->Finished_ = true; + promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_))}); + } else { + NYql::TIssues issues; + NYql::IssuesFromMessage(self->Response_.issues(), issues); + EStatus clientStatus = static_cast<EStatus>(self->Response_.status()); + // TODO: Add headers for streaming calls. + TPlainStatus plainStatus{clientStatus, std::move(issues), self->Endpoint_, {}}; + TStatus status{std::move(plainStatus)}; + TMaybe<TQueryStats> queryStats; + + if (self->Response_.result().has_query_stats()) { + queryStats = TQueryStats(self->Response_.result().query_stats()); + } + + if (self->Response_.result().has_result_set()) { + promise.SetValue({std::move(status), + TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats}); + } else { + promise.SetValue({std::move(status), queryStats}); + } + } + }; + StreamProcessor_->Read(&Response_, readCb); + return promise.GetFuture(); +} + +} +} diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/readers.h b/ydb/public/sdk/cpp/client/ydb_table/impl/readers.h new file mode 100644 index 0000000000..8103ce0925 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/readers.h @@ -0,0 +1,66 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> + +#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> + +#include <util/random/random.h> + +#include "client_session.h" +#include "data_query.h" +#include "request_migrator.h" + + +namespace NYdb { +namespace NTable { + +using namespace NThreading; + + +class TTablePartIterator::TReaderImpl { +public: + using TSelf = TTablePartIterator::TReaderImpl; + using TResponse = Ydb::Table::ReadTableResponse; + using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr; + using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback; + using TGRpcStatus = NGrpc::TGrpcStatus; + using TBatchReadResult = std::pair<TResponse, TGRpcStatus>; + + TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint); + ~TReaderImpl(); + bool IsFinished(); + TAsyncSimpleStreamPart<TResultSet> ReadNext(std::shared_ptr<TSelf> self); + +private: + TStreamProcessorPtr StreamProcessor_; + TResponse Response_; + bool Finished_; + TString Endpoint_; +}; + + +class TScanQueryPartIterator::TReaderImpl { +public: + using TSelf = TScanQueryPartIterator::TReaderImpl; + using TResponse = Ydb::Table::ExecuteScanQueryPartialResponse; + using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr; + using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback; + using TGRpcStatus = NGrpc::TGrpcStatus; + using TBatchReadResult = std::pair<TResponse, TGRpcStatus>; + + TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint); + ~TReaderImpl(); + bool IsFinished() const; + TAsyncScanQueryPart ReadNext(std::shared_ptr<TSelf> self); + +private: + TStreamProcessorPtr StreamProcessor_; + TResponse Response_; + bool Finished_; + TString Endpoint_; +}; + + +} +} diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp index ff1729043a..35f91b69b0 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp @@ -40,7 +40,7 @@ void TRequestMigrator::SetHost(ui64 nodeId) { CurHost_ = nodeId; } -bool TRequestMigrator::IsOurSession(TSession::TImpl* session) const { +bool TRequestMigrator::IsOurSession(const TSession::TImpl* session) const { if (!CurHost_) return false; @@ -65,7 +65,7 @@ bool TRequestMigrator::Reset() { } } -bool TRequestMigrator::DoCheckAndMigrate(TSession::TImpl* session) { +bool TRequestMigrator::DoCheckAndMigrate(const TSession::TImpl* session) { if (session->GetEndpoint().empty()) return false; diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h index 2eb7e1b609..fbe6875b3a 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h @@ -51,13 +51,13 @@ public: // Returns false if session is not suitable or unable to get lock to start migration // Returns true if session is suitable in this case Unlink methos on the session is called // This methos is thread safe. - bool DoCheckAndMigrate(TSession::TImpl* session); + bool DoCheckAndMigrate(const TSession::TImpl* session); // Reset migrator to initiall state if migration was not started and returns true // Returns false if migration was started bool Reset(); private: - bool IsOurSession(TSession::TImpl* session) const; + bool IsOurSession(const TSession::TImpl* session) const; ui64 CurHost_ = 0; diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp new file mode 100644 index 0000000000..1e360b78af --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp @@ -0,0 +1,346 @@ +#include "session_pool.h" +#include "table_client.h" + + +namespace NYdb { +namespace NTable { + +using namespace NThreading; + + +TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout) + : MaxQueueSize_(maxQueueSize) + , MaxWaitSessionTimeout_(maxWaitSessionTimeout) +{ +} + +bool TSessionPool::TWaitersQueue::TryPush(NThreading::TPromise<TCreateSessionResult>& p) { + if (Waiters_.size() < MaxQueueSize_) { + Waiters_.insert(std::make_pair(TInstant::Now(), p)); + return true; + } + return false; +} + +TMaybe<NThreading::TPromise<TCreateSessionResult>> TSessionPool::TWaitersQueue::TryGet() { + if (Waiters_.empty()) { + return {}; + } + auto it = Waiters_.begin(); + auto result = it->second; + Waiters_.erase(it); + return result; +} + +void TSessionPool::TWaitersQueue::GetOld(TInstant now, TVector<NThreading::TPromise<TCreateSessionResult>>& oldWaiters) { + auto it = Waiters_.begin(); + while (it != Waiters_.end()) { + if (now < it->first + MaxWaitSessionTimeout_) + break; + + oldWaiters.emplace_back(std::move(it->second)); + + Waiters_.erase(it++); + } +} + +ui32 TSessionPool::TWaitersQueue::Size() const { + return Waiters_.size(); +} + + +TSessionPool::TSessionPool(ui32 maxActiveSessions) + : Closed_(false) + , WaitersQueue_(maxActiveSessions * 10) + , ActiveSessions_(0) + , MaxActiveSessions_(maxActiveSessions) +{} + +void TTableClient::TImpl::CloseAndDeleteSession(std::unique_ptr<TSession::TImpl>&& impl, + std::shared_ptr<TTableClient::TImpl> client) { + std::shared_ptr<TSession::TImpl> deleteSession( + impl.release(), + TSession::TImpl::GetSmartDeleter(client)); + + deleteSession->MarkBroken(); +} + +void TSessionPool::CreateFakeSession( + NThreading::TPromise<TCreateSessionResult>& promise, + std::shared_ptr<TTableClient::TImpl> client) +{ + TSession session(client, "", "", false); + // Mark standalone to prevent returning to session pool + session.SessionImpl_->MarkStandalone(); + TCreateSessionResult val( + TStatus( + TPlainStatus( + EStatus::CLIENT_RESOURCE_EXHAUSTED, + "Active sessions limit exceeded" + ) + ), + std::move(session) + ); + + client->ScheduleTaskUnsafe([promise, val{std::move(val)}]() mutable { + promise.SetValue(std::move(val)); + }, TDuration()); +} + +void TSessionPool::MakeSessionPromiseFromSession( + TSession::TImpl* session, + NThreading::TPromise<TCreateSessionResult>& promise, + std::shared_ptr<TTableClient::TImpl> client +) { + Y_VERIFY(session->GetState() == TSession::TImpl::S_IDLE); + Y_VERIFY(!session->GetTimeInterval()); + session->MarkActive(); + session->SetNeedUpdateActiveCounter(true); + + TCreateSessionResult val( + TStatus(TPlainStatus()), + TSession( + client, + std::shared_ptr<TSession::TImpl>( + session, TSession::TImpl::GetSmartDeleter(client) + ) + ) + ); + + client->ScheduleTaskUnsafe( + [promise, val{std::move(val)}]() mutable { + promise.SetValue(std::move(val)); + }, + TDuration() + ); +} + +TAsyncCreateSessionResult TSessionPool::GetSession( + std::shared_ptr<TTableClient::TImpl> client, + const TCreateSessionSettings& settings) +{ + auto createSessionPromise = NewPromise<TCreateSessionResult>(); + std::unique_ptr<TSession::TImpl> sessionImpl; + enum class TSessionSource { + Pool, + Waiter, + Error + } sessionSource = TSessionSource::Pool; + + { + std::lock_guard guard(Mtx_); + + if (MaxActiveSessions_ == 0 || ActiveSessions_ < MaxActiveSessions_) { + IncrementActiveCounterUnsafe(); + } else if (WaitersQueue_.TryPush(createSessionPromise)) { + sessionSource = TSessionSource::Waiter; + } else { + sessionSource = TSessionSource::Error; + } + if (!Sessions_.empty()) { + auto it = std::prev(Sessions_.end()); + sessionImpl = std::move(it->second); + Sessions_.erase(it); + } + + UpdateStats(); + } + if (sessionSource == TSessionSource::Waiter) { + // Nothing to do here + } else if (sessionSource == TSessionSource::Error) { + FakeSessionsCounter_.Inc(); + CreateFakeSession(createSessionPromise, client); + } else if (sessionImpl) { + MakeSessionPromiseFromSession(sessionImpl.release(), createSessionPromise, client); + } else { + const auto& sessionResult = client->CreateSession(settings, false); + sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true)); + } + + return createSessionPromise.GetFuture(); +} + +bool TSessionPool::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client) { + NThreading::TPromise<TCreateSessionResult> createSessionPromise; + { + std::lock_guard guard(Mtx_); + if (Closed_) + return false; + + if (auto prom = WaitersQueue_.TryGet()) { + createSessionPromise = std::move(*prom); + } else { + return false; + } + } + + // This code can be called from client dtors. It may be unsafe to + // execute grpc call directly... + client->ScheduleTaskUnsafe([createSessionPromise, client]() mutable { + TCreateSessionSettings settings; + settings.ClientTimeout(CREATE_SESSION_INTERNAL_TIMEOUT); + + const auto& sessionResult = client->CreateSession(settings, false); + sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true)); + }, TDuration()); + return true; +} + +bool TSessionPool::ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) { + // Do not set promise under the session pool lock + NThreading::TPromise<TCreateSessionResult> createSessionPromise; + { + std::lock_guard guard(Mtx_); + if (Closed_) + return false; + + if (auto prom = WaitersQueue_.TryGet()) { + createSessionPromise = std::move(*prom); + if (!active) + IncrementActiveCounterUnsafe(); + } else { + Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl)); + + if (active) { + Y_VERIFY(ActiveSessions_); + ActiveSessions_--; + impl->SetNeedUpdateActiveCounter(false); + } + } + UpdateStats(); + } + + if (createSessionPromise.Initialized()) { + MakeSessionPromiseFromSession(impl, createSessionPromise, client); + } + + return true; +} + +void TSessionPool::DecrementActiveCounter() { + std::lock_guard guard(Mtx_); + Y_VERIFY(ActiveSessions_); + ActiveSessions_--; + UpdateStats(); +} + +void TSessionPool::IncrementActiveCounterUnsafe() { + ActiveSessions_++; + UpdateStats(); +} + +void TSessionPool::Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close) { + std::lock_guard guard(Mtx_); + Closed_ = close; + for (auto it = Sessions_.begin(); it != Sessions_.end();) { + const bool cont = cb(std::move(it->second)); + it = Sessions_.erase(it); + if (!cont) + break; + } + UpdateStats(); +} + +TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, + TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate) +{ + auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) { + if (status != EStatus::SUCCESS) { + return false; + } + + auto strongClient = weakClient.lock(); + if (!strongClient) { + // No more clients alive - no need to run periodic, + // moreover it is unsafe to touch this ptr! + return false; + } else { + auto keepAliveBatchSize = PERIODIC_ACTION_BATCH_SIZE; + TVector<std::unique_ptr<TSession::TImpl>> sessionsToTouch; + sessionsToTouch.reserve(keepAliveBatchSize); + TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete; + sessionsToDelete.reserve(keepAliveBatchSize); + TVector<NThreading::TPromise<TCreateSessionResult>> waitersToReplyError; + waitersToReplyError.reserve(keepAliveBatchSize); + const auto now = TInstant::Now(); + { + std::lock_guard guard(Mtx_); + { + auto& sessions = Sessions_; + + auto it = sessions.begin(); + while (it != sessions.end() && keepAliveBatchSize--) { + if (now < it->second->GetTimeToTouchFast()) + break; + + if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) { + sessionsToDelete.emplace_back(std::move(it->second)); + } else { + sessionsToTouch.emplace_back(std::move(it->second)); + } + sessions.erase(it++); + } + } + + WaitersQueue_.GetOld(now, waitersToReplyError); + + UpdateStats(); + } + + for (auto& sessionImpl : sessionsToTouch) { + if (sessionImpl) { + Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); + TSession session(strongClient, std::shared_ptr<TSession::TImpl>( + sessionImpl.release(), + TSession::TImpl::GetSmartDeleter(strongClient))); + cmd(session); + } + } + + for (auto& sessionImpl : sessionsToDelete) { + if (sessionImpl) { + Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); + TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient); + } + } + + for (auto& waiter : waitersToReplyError) { + FakeSessionsCounter_.Inc(); + CreateFakeSession(waiter, strongClient); + } + } + + return true; + }; + return periodicCb; +} + +i64 TSessionPool::GetActiveSessions() const { + std::lock_guard guard(Mtx_); + return ActiveSessions_; +} + +i64 TSessionPool::GetActiveSessionsLimit() const { + return MaxActiveSessions_; +} + +i64 TSessionPool::GetCurrentPoolSize() const { + std::lock_guard guard(Mtx_); + return Sessions_.size(); +} + +void TSessionPool::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) { + ActiveSessionsCounter_.Set(statCollector.ActiveSessions); + InPoolSessionsCounter_.Set(statCollector.InPoolSessions); + FakeSessionsCounter_.Set(statCollector.FakeSessions); + SessionWaiterCounter_.Set(statCollector.Waiters); +} + +void TSessionPool::UpdateStats() { + ActiveSessionsCounter_.Apply(ActiveSessions_); + InPoolSessionsCounter_.Apply(Sessions_.size()); + SessionWaiterCounter_.Apply(WaitersQueue_.Size()); +} + +} +} diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h new file mode 100644 index 0000000000..983d675bce --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h @@ -0,0 +1,90 @@ +#pragma once + +#include "client_session.h" + +#include <ydb/public/sdk/cpp/client/ydb_types/core_facility/core_facility.h> + +#include <util/generic/map.h> + + +namespace NYdb { +namespace NTable { + +using namespace NThreading; + + +constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session +constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval +constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool + + +class TSessionPool { + typedef TAsyncCreateSessionResult + (*TAwareSessonProvider) + (std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings); +private: + class TWaitersQueue { + public: + TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout=MAX_WAIT_SESSION_TIMEOUT); + + bool TryPush(NThreading::TPromise<TCreateSessionResult>& p); + TMaybe<NThreading::TPromise<TCreateSessionResult>> TryGet(); + void GetOld(TInstant now, TVector<NThreading::TPromise<TCreateSessionResult>>& oldWaiters); + ui32 Size() const; + + private: + const ui32 MaxQueueSize_; + const TDuration MaxWaitSessionTimeout_; + TMultiMap<TInstant, NThreading::TPromise<TCreateSessionResult>> Waiters_; + }; +public: + using TKeepAliveCmd = std::function<void(TSession session)>; + using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>; + TSessionPool(ui32 maxActiveSessions); + // TAwareSessonProvider: + // function is called if session pool is empty, + // this is used for additional total session count limitation + TAsyncCreateSessionResult GetSession( + std::shared_ptr<TTableClient::TImpl> client, + const TCreateSessionSettings& settings); + // Returns true if session returned to pool successfully + bool ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client); + // Returns trun if has waiter and scheduled to create new session + // too feed it + bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client); + TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); + i64 GetActiveSessions() const; + i64 GetActiveSessionsLimit() const; + i64 GetCurrentPoolSize() const; + void DecrementActiveCounter(); + void IncrementActiveCounterUnsafe(); + + void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close); + void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector); + + static void CreateFakeSession(NThreading::TPromise<TCreateSessionResult>& promise, + std::shared_ptr<TTableClient::TImpl> client); +private: + void UpdateStats(); + void MakeSessionPromiseFromSession( + TSession::TImpl* session, + NThreading::TPromise<TCreateSessionResult>& promise, + std::shared_ptr<TTableClient::TImpl> client + ); + + mutable std::mutex Mtx_; + bool Closed_; + + TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_; + TWaitersQueue WaitersQueue_; + + i64 ActiveSessions_; + const ui32 MaxActiveSessions_; + NSdkStats::TSessionCounter ActiveSessionsCounter_; + NSdkStats::TSessionCounter InPoolSessionsCounter_; + NSdkStats::TSessionCounter SessionWaiterCounter_; + NSdkStats::TAtomicCounter<::NMonitoring::TRate> FakeSessionsCounter_; +}; + +} +} diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp new file mode 100644 index 0000000000..096153cbcf --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -0,0 +1,1135 @@ +#include "table_client.h" + + +namespace NYdb { +namespace NTable { + +using namespace NThreading; + + +const TKeepAliveSettings TTableClient::TImpl::KeepAliveSettings = TKeepAliveSettings().ClientTimeout(KEEP_ALIVE_CLIENT_TIMEOUT); + + +bool IsSessionCloseRequested(const TStatus& status) { + const auto& meta = status.GetResponseMetadata(); + auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS); + for(auto it = hints.first; it != hints.second; ++it) { + if (it->second == NYdb::YDB_SESSION_CLOSE) { + return true; + } + } + + return false; +} + +TDuration RandomizeThreshold(TDuration duration) { + TDuration::TValue value = duration.GetValue(); + if (KEEP_ALIVE_RANDOM_FRACTION) { + const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION; + if (randomLimit < 2) + return duration; + value += static_cast<i64>(RandomNumber<ui64>(randomLimit)); + } + return TDuration::FromValue(value); +} + +TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings) { + return Min(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_); +} + +TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings) { + return Max(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_); +} + +TStatus GetStatus(const TOperation& operation) { + return operation.Status(); +} + +TStatus GetStatus(const TStatus& status) { + return status; +} + +ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber) { + ui32 backoffSlots = 1 << std::min(retryNumber, settings.Ceiling_); + TDuration maxDuration = settings.SlotDuration_ * backoffSlots; + + double uncertaintyRatio = std::max(std::min(settings.UncertainRatio_, 1.0), 0.0); + double uncertaintyMultiplier = RandomNumber<double>() * uncertaintyRatio - uncertaintyRatio + 1.0; + + double durationMs = round(maxDuration.MilliSeconds() * uncertaintyMultiplier); + + return std::max(std::min(durationMs, (double)MAX_BACKOFF_DURATION_MS), 0.0); +} + + + +TTableClient::TImpl::TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + , Settings_(settings) + , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_) +{ + if (!DbDriverState_->StatCollector.IsCollecting()) { + return; + } + + SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector()); + SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector()); +} + +TTableClient::TImpl::~TImpl() { + if (Connections_->GetDrainOnDtors()) { + Drain().Wait(); + } +} + +bool TTableClient::TImpl::LinkObjToEndpoint(const TEndpointKey& endpoint, TEndpointObj* obj, const void* tag) { + return DbDriverState_->EndpointPool.LinkObjToEndpoint(endpoint, obj, tag); +} + +void TTableClient::TImpl::InitStopper() { + std::weak_ptr<TTableClient::TImpl> weak = shared_from_this(); + auto cb = [weak]() mutable { + auto strong = weak.lock(); + if (!strong) { + auto promise = NThreading::NewPromise<void>(); + promise.SetException("no more client"); + return promise.GetFuture(); + } + return strong->Drain(); + }; + + DbDriverState_->AddCb(std::move(cb), TDbDriverState::ENotifyType::STOP); +} + +NThreading::TFuture<void> TTableClient::TImpl::Drain() { + TVector<std::unique_ptr<TSession::TImpl>> sessions; + // No realocations under lock + sessions.reserve(Settings_.SessionPoolSettings_.MaxActiveSessions_); + auto drainer = [&sessions](std::unique_ptr<TSession::TImpl>&& impl) mutable { + sessions.push_back(std::move(impl)); + return true; + }; + SessionPool_.Drain(drainer, true); + TVector<TAsyncStatus> closeResults; + for (auto& s : sessions) { + if (s->GetId()) { + closeResults.push_back(CloseInternal(s.get())); + } + } + sessions.clear(); + return NThreading::WaitExceptionOrAll(closeResults); +} + +NThreading::TFuture<void> TTableClient::TImpl::Stop() { + return Drain(); +} + +void TTableClient::TImpl::ScheduleTask(const std::function<void()>& fn, TDuration timeout) { + std::weak_ptr<TTableClient::TImpl> weak = shared_from_this(); + auto cbGuard = [weak, fn]() { + auto strongClient = weak.lock(); + if (strongClient) { + fn(); + } + }; + Connections_->ScheduleOneTimeTask(std::move(cbGuard), timeout); +} + +void TTableClient::TImpl::ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) { + Connections_->ScheduleOneTimeTask(std::move(fn), timeout); +} + +void TTableClient::TImpl::AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn) { + auto durationMs = CalcBackoffTime(settings, retryNumber); + ScheduleTask(fn, TDuration::MilliSeconds(durationMs)); +} + +void TTableClient::TImpl::StartPeriodicSessionPoolTask() { + + auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) { + + const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_; + const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast(); + + if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) { + if (sessionsCount > sessionPoolSettings.MinPoolSize_) { + return true; + } + } + + return false; + }; + + auto keepAliveCmd = [](TSession session) { + + Y_VERIFY(session.GetId()); + + const auto sessionPoolSettings = session.Client_->Settings_.SessionPoolSettings_; + const auto spentTime = session.SessionImpl_->GetTimeToTouchFast() - session.SessionImpl_->GetTimeInPastFast(); + + const auto maxTimeToTouch = GetMaxTimeToTouch(session.Client_->Settings_.SessionPoolSettings_); + const auto minTimeToTouch = GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_); + + auto calcTimeToNextTouch = [maxTimeToTouch, minTimeToTouch] (const TDuration spent) { + auto timeToNextTouch = minTimeToTouch; + if (maxTimeToTouch > spent) { + auto t = maxTimeToTouch - spent; + timeToNextTouch = Min(t, minTimeToTouch); + } + return timeToNextTouch; + }; + + if (spentTime >= sessionPoolSettings.KeepAliveIdleThreshold_) { + + // Handle of session status will be done inside InjectSessionStatusInterception routine. + // We just need to reschedule time to next call because InjectSessionStatusInterception doesn't + // update timeInPast for calls from internal keep alive routine + session.KeepAlive(KeepAliveSettings) + .Subscribe([spentTime, session, maxTimeToTouch, calcTimeToNextTouch](TAsyncKeepAliveResult asyncResult) { + if (!asyncResult.GetValue().IsSuccess()) + return; + + if (spentTime >= maxTimeToTouch) { + auto timeToNextTouch = calcTimeToNextTouch(spentTime); + session.SessionImpl_->ScheduleTimeToTouchFast(timeToNextTouch, true); + } + }); + return; + } + + auto timeToNextTouch = calcTimeToNextTouch(spentTime); + session.SessionImpl_->ScheduleTimeToTouchFast( + RandomizeThreshold(timeToNextTouch), + spentTime >= maxTimeToTouch + ); + }; + + std::weak_ptr<TTableClient::TImpl> weak = shared_from_this(); + Connections_->AddPeriodicTask( + SessionPool_.CreatePeriodicTask( + weak, + std::move(keepAliveCmd), + std::move(deletePredicate) + ), PERIODIC_ACTION_INTERVAL); +} + +ui64 TTableClient::TImpl::ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client) { + size_t max = 0; + ui64 result = 0; + + auto cb = [&result, &max](ui64 nodeId, const IObjRegistryHandle& handle) { + const auto sz = handle.Size(); + if (sz > max) { + result = nodeId; + max = sz; + } + }; + + client->DbDriverState_->ForEachForeignEndpoint(cb, client.get()); + + return result; +} + +std::pair<ui64, size_t> TTableClient::TImpl::ScanLocation(std::shared_ptr<TTableClient::TImpl> client, + std::unordered_map<ui64, size_t>& sessions, bool allNodes) +{ + std::pair<ui64, size_t> result = {0, 0}; + + auto cb = [&result, &sessions](ui64 nodeId, const IObjRegistryHandle& handle) { + const auto sz = handle.Size(); + sessions.insert({nodeId, sz}); + if (sz > result.second) { + result.first = nodeId; + result.second = sz; + } + }; + + if (allNodes) { + client->DbDriverState_->ForEachEndpoint(cb, client.get()); + } else { + client->DbDriverState_->ForEachLocalEndpoint(cb, client.get()); + } + + return result; +} + +NMath::TStats TTableClient::TImpl::CalcCV(const std::unordered_map<ui64, size_t>& in) { + TVector<size_t> t; + t.reserve(in.size()); + std::transform(in.begin(), in.end(), std::back_inserter(t), [](const std::pair<ui64, size_t>& pair) { + return pair.second; + }); + return NMath::CalcCV(t); +} + +void TTableClient::TImpl::StartPeriodicHostScanTask() { + std::weak_ptr<TTableClient::TImpl> weak = shared_from_this(); + + // The future in completed when we have finished current migrate task + // and ready to accept new one + std::pair<ui64, size_t> winner = {0, 0}; + + auto periodicCb = [weak, winner](NYql::TIssues&&, EStatus status) mutable -> bool { + + if (status != EStatus::SUCCESS) { + return false; + } + + auto strongClient = weak.lock(); + if (!strongClient) { + return false; + } else { + TRequestMigrator& migrator = strongClient->RequestMigrator_; + + const auto balancingPolicy = strongClient->DbDriverState_->GetBalancingPolicy(); + + // Try to find any host at foreign locations if prefer local dc + const ui64 foreignHost = (balancingPolicy == EBalancingPolicy::UsePreferableLocation) ? + ScanForeignLocations(strongClient) : 0; + + std::unordered_map<ui64, size_t> hostMap; + + winner = ScanLocation(strongClient, hostMap, + balancingPolicy == EBalancingPolicy::UseAllNodes); + + bool forceMigrate = false; + + // There is host in foreign locations + if (foreignHost) { + // But no hosts at local + if (hostMap.empty()) { + Y_VERIFY(!winner.second); + // Scan whole cluster - we have no local dc + winner = ScanLocation(strongClient, hostMap, true); + } else { + // We have local and foreign hosts, so force migration to local one + forceMigrate = true; + // Just replace source + winner.first = foreignHost; + winner.second++; + } + } + + const auto minCv = strongClient->Settings_.MinSessionCV_; + + const auto stats = CalcCV(hostMap); + + strongClient->DbDriverState_->StatCollector.SetSessionCV(stats.Cv); + + // Just scan to update monitoring counter ^^ + // Balancing feature is disabled. + if (!minCv) + return true; + + if (hostMap.size() < 2) + return true; + + // Migrate last session only if move from foreign to local + if (!forceMigrate && winner.second < 2) + return true; + + if (stats.Cv > minCv || forceMigrate) { + migrator.SetHost(winner.first); + } else { + migrator.SetHost(0); + } + return true; + } + }; + + Connections_->AddPeriodicTask(std::move(periodicCb), HOSTSCAN_PERIODIC_ACTION_INTERVAL); +} + +TAsyncCreateSessionResult TTableClient::TImpl::GetSession(const TCreateSessionSettings& settings) { + return SessionPool_.GetSession(shared_from_this(), settings); +} + +i64 TTableClient::TImpl::GetActiveSessionCount() const { + return SessionPool_.GetActiveSessions(); +} + +i64 TTableClient::TImpl::GetActiveSessionsLimit() const { + return SessionPool_.GetActiveSessionsLimit(); +} + +i64 TTableClient::TImpl::GetCurrentPoolSize() const { + return SessionPool_.GetCurrentPoolSize(); +} + +TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessionSettings& settings, bool standalone, + TString preferedLocation) +{ + auto request = MakeOperationRequest<Ydb::Table::CreateSessionRequest>(settings); + + auto createSessionPromise = NewPromise<TCreateSessionResult>(); + auto self = shared_from_this(); + auto rpcSettings = TRpcRequestSettings::Make(settings); + rpcSettings.Header.push_back({NYdb::YDB_CLIENT_CAPABILITIES, NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER}); + + auto createSessionExtractor = [createSessionPromise, self, standalone] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Table::CreateSessionResult result; + if (any) { + any->UnpackTo(&result); + } + auto session = TSession(self, result.session_id(), status.Endpoint, !standalone); + if (status.Ok()) { + if (standalone) { + session.SessionImpl_->MarkStandalone(); + } else { + session.SessionImpl_->MarkActive(); + } + self->DbDriverState_->StatCollector.IncSessionsOnHost(status.Endpoint); + } else { + // We do not use SessionStatusInterception for CreateSession request + session.SessionImpl_->MarkBroken(); + } + TCreateSessionResult val(TStatus(std::move(status)), std::move(session)); + createSessionPromise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse>( + std::move(request), + createSessionExtractor, + &Ydb::Table::V1::TableService::Stub::AsyncCreateSession, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + rpcSettings, + TEndpointKey(preferedLocation, 0)); + + std::weak_ptr<TDbDriverState> state = DbDriverState_; + + return createSessionPromise.GetFuture(); +} + +TAsyncKeepAliveResult TTableClient::TImpl::KeepAlive(const TSession::TImpl* session, const TKeepAliveSettings& settings) { + auto request = MakeOperationRequest<Ydb::Table::KeepAliveRequest>(settings); + request.set_session_id(session->GetId()); + + auto keepAliveResultPromise = NewPromise<TKeepAliveResult>(); + auto self = shared_from_this(); + + auto keepAliveExtractor = [keepAliveResultPromise, self] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Table::KeepAliveResult result; + ESessionStatus sessionStatus = ESessionStatus::Unspecified; + if (any) { + any->UnpackTo(&result); + + switch (result.session_status()) { + case Ydb::Table::KeepAliveResult_SessionStatus_SESSION_STATUS_READY: + sessionStatus = ESessionStatus::Ready; + break; + case Ydb::Table::KeepAliveResult_SessionStatus_SESSION_STATUS_BUSY: + sessionStatus = ESessionStatus::Busy; + break; + default: + sessionStatus = ESessionStatus::Unspecified; + } + } + TKeepAliveResult val(TStatus(std::move(status)), sessionStatus); + keepAliveResultPromise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::KeepAliveRequest, Ydb::Table::KeepAliveResponse>( + std::move(request), + keepAliveExtractor, + &Ydb::Table::V1::TableService::Stub::AsyncKeepAlive, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + session->GetEndpointKey()); + + return keepAliveResultPromise.GetFuture(); +} + +TFuture<TStatus> TTableClient::TImpl::CreateTable(Ydb::Table::CreateTableRequest&& request, const TCreateTableSettings& settings) +{ + return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CreateTableRequest,Ydb::Table::CreateTableResponse>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncCreateTable, + TRpcRequestSettings::Make(settings)); +} + +TFuture<TStatus> TTableClient::TImpl::AlterTable(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings) +{ + return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::AlterTableRequest, Ydb::Table::AlterTableResponse>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncAlterTable, + TRpcRequestSettings::Make(settings)); +} + +TAsyncOperation TTableClient::TImpl::AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings) +{ + using Ydb::Table::V1::TableService; + using Ydb::Table::AlterTableRequest; + using Ydb::Table::AlterTableResponse; + return RunOperation<TableService, AlterTableRequest, AlterTableResponse, TOperation>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncAlterTable, + TRpcRequestSettings::Make(settings)); +} + +TFuture<TStatus> TTableClient::TImpl::CopyTable(const TString& sessionId, const TString& src, const TString& dst, + const TCopyTableSettings& settings) +{ + auto request = MakeOperationRequest<Ydb::Table::CopyTableRequest>(settings); + request.set_session_id(sessionId); + request.set_source_path(src); + request.set_destination_path(dst); + + return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CopyTableRequest, Ydb::Table::CopyTableResponse>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncCopyTable, + TRpcRequestSettings::Make(settings)); +} + +TFuture<TStatus> TTableClient::TImpl::CopyTables(Ydb::Table::CopyTablesRequest&& request, const TCopyTablesSettings& settings) +{ + return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CopyTablesRequest, Ydb::Table::CopyTablesResponse>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncCopyTables, + TRpcRequestSettings::Make(settings)); +} + +TFuture<TStatus> TTableClient::TImpl::RenameTables(Ydb::Table::RenameTablesRequest&& request, const TRenameTablesSettings& settings) +{ + return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RenameTablesRequest, Ydb::Table::RenameTablesResponse>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncRenameTables, + TRpcRequestSettings::Make(settings)); +} + +TFuture<TStatus> TTableClient::TImpl::DropTable(const TString& sessionId, const TString& path, const TDropTableSettings& settings) { + auto request = MakeOperationRequest<Ydb::Table::DropTableRequest>(settings); + request.set_session_id(sessionId); + request.set_path(path); + + return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DropTableRequest, Ydb::Table::DropTableResponse>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncDropTable, + TRpcRequestSettings::Make(settings)); +} + +TAsyncDescribeTableResult TTableClient::TImpl::DescribeTable(const TString& sessionId, const TString& path, const TDescribeTableSettings& settings) { + auto request = MakeOperationRequest<Ydb::Table::DescribeTableRequest>(settings); + request.set_session_id(sessionId); + request.set_path(path); + if (settings.WithKeyShardBoundary_) { + request.set_include_shard_key_bounds(true); + } + + if (settings.WithTableStatistics_) { + request.set_include_table_stats(true); + } + + if (settings.WithPartitionStatistics_) { + request.set_include_partition_stats(true); + } + + auto promise = NewPromise<TDescribeTableResult>(); + + auto extractor = [promise, settings] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Table::DescribeTableResult result; + if (any) { + any->UnpackTo(&result); + } + TDescribeTableResult describeTableResult(TStatus(std::move(status)), + std::move(result), settings); + promise.SetValue(std::move(describeTableResult)); + }; + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::DescribeTableRequest, Ydb::Table::DescribeTableResponse>( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncDescribeTable, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); +} + +TAsyncPrepareQueryResult TTableClient::TImpl::PrepareDataQuery(const TSession& session, const TString& query, + const TPrepareDataQuerySettings& settings) +{ + auto request = MakeOperationRequest<Ydb::Table::PrepareDataQueryRequest>(settings); + request.set_session_id(session.GetId()); + request.set_yql_text(query); + + auto promise = NewPromise<TPrepareQueryResult>(); + + //See ExecuteDataQueryInternal for explanation + auto sessionPtr = new TSession(session); + auto extractor = [promise, sessionPtr, query] + (google::protobuf::Any* any, TPlainStatus status) mutable { + TDataQuery dataQuery(*sessionPtr, query, ""); + + if (any) { + Ydb::Table::PrepareQueryResult result; + any->UnpackTo(&result); + + if (status.Ok()) { + dataQuery = TDataQuery(*sessionPtr, query, result.query_id(), result.parameters_types()); + sessionPtr->SessionImpl_->AddQueryToCache(dataQuery); + } + } + + TPrepareQueryResult prepareQueryResult(TStatus(std::move(status)), + dataQuery, false); + delete sessionPtr; + promise.SetValue(std::move(prepareQueryResult)); + }; + + CollectQuerySize(query, QuerySizeHistogram); + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::PrepareDataQueryRequest, Ydb::Table::PrepareDataQueryResponse>( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncPrepareDataQuery, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + session.SessionImpl_->GetEndpointKey()); + + return promise.GetFuture(); +} + +TAsyncStatus TTableClient::TImpl::ExecuteSchemeQuery(const TString& sessionId, const TString& query, + const TExecSchemeQuerySettings& settings) +{ + auto request = MakeOperationRequest<Ydb::Table::ExecuteSchemeQueryRequest>(settings); + request.set_session_id(sessionId); + request.set_yql_text(query); + + return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::ExecuteSchemeQueryRequest, Ydb::Table::ExecuteSchemeQueryResponse>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncExecuteSchemeQuery, + TRpcRequestSettings::Make(settings)); +} + +TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSession& session, const TTxSettings& txSettings, + const TBeginTxSettings& settings) +{ + auto request = MakeOperationRequest<Ydb::Table::BeginTransactionRequest>(settings); + request.set_session_id(session.GetId()); + SetTxSettings(txSettings, request.mutable_tx_settings()); + + auto promise = NewPromise<TBeginTransactionResult>(); + + auto extractor = [promise, session] + (google::protobuf::Any* any, TPlainStatus status) mutable { + TString txId; + if (any) { + Ydb::Table::BeginTransactionResult result; + any->UnpackTo(&result); + txId = result.tx_meta().id(); + } + + TBeginTransactionResult beginTxResult(TStatus(std::move(status)), + TTransaction(session, txId)); + promise.SetValue(std::move(beginTxResult)); + }; + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BeginTransactionRequest, Ydb::Table::BeginTransactionResponse>( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncBeginTransaction, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + session.SessionImpl_->GetEndpointKey()); + + return promise.GetFuture(); +} + +TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TTransaction& tx, + const TCommitTxSettings& settings) +{ + auto request = MakeOperationRequest<Ydb::Table::CommitTransactionRequest>(settings); + request.set_session_id(session.GetId()); + request.set_tx_id(tx.GetId()); + request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); + + auto promise = NewPromise<TCommitTransactionResult>(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + TMaybe<TQueryStats> queryStats; + if (any) { + Ydb::Table::CommitTransactionResult result; + any->UnpackTo(&result); + + if (result.has_query_stats()) { + queryStats = TQueryStats(result.query_stats()); + } + } + + TCommitTransactionResult commitTxResult(TStatus(std::move(status)), queryStats); + promise.SetValue(std::move(commitTxResult)); + }; + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::CommitTransactionRequest, Ydb::Table::CommitTransactionResponse>( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncCommitTransaction, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + session.SessionImpl_->GetEndpointKey()); + + return promise.GetFuture(); +} + +TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TTransaction& tx, + const TRollbackTxSettings& settings) +{ + auto request = MakeOperationRequest<Ydb::Table::RollbackTransactionRequest>(settings); + request.set_session_id(session.GetId()); + request.set_tx_id(tx.GetId()); + + return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncRollbackTransaction, + TRpcRequestSettings::Make(settings), + session.SessionImpl_->GetEndpointKey()); +} + +TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSession& session, const TString& query, + const TExplainDataQuerySettings& settings) +{ + auto request = MakeOperationRequest<Ydb::Table::ExplainDataQueryRequest>(settings); + request.set_session_id(session.GetId()); + request.set_yql_text(query); + + auto promise = NewPromise<TExplainQueryResult>(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + TString ast; + TString plan; + if (any) { + Ydb::Table::ExplainQueryResult result; + any->UnpackTo(&result); + ast = result.query_ast(); + plan = result.query_plan(); + } + TExplainQueryResult val(TStatus(std::move(status)), + std::move(plan), std::move(ast)); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::ExplainDataQueryRequest, Ydb::Table::ExplainDataQueryResponse>( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncExplainDataQuery, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + session.SessionImpl_->GetEndpointKey()); + + return promise.GetFuture(); +} + +void TTableClient::TImpl::SetTypedValue(Ydb::TypedValue* protoValue, const TValue& value) { + protoValue->mutable_type()->CopyFrom(TProtoAccessor::GetProto(value.GetType())); + protoValue->mutable_value()->CopyFrom(TProtoAccessor::GetProto(value)); +} + +NThreading::TFuture<std::pair<TPlainStatus, TTableClient::TImpl::TReadTableStreamProcessorPtr>> TTableClient::TImpl::ReadTable( + const TString& sessionId, + const TString& path, + const TReadTableSettings& settings) +{ + auto request = MakeRequest<Ydb::Table::ReadTableRequest>(); + request.set_session_id(sessionId); + request.set_path(path); + request.set_ordered(settings.Ordered_); + if (settings.RowLimit_) { + request.set_row_limit(settings.RowLimit_.GetRef()); + } + for (const auto& col : settings.Columns_) { + request.add_columns(col); + } + if (settings.UseSnapshot_) { + request.set_use_snapshot( + settings.UseSnapshot_.GetRef() + ? Ydb::FeatureFlag::ENABLED + : Ydb::FeatureFlag::DISABLED); + } + + if (settings.From_) { + const auto& from = settings.From_.GetRef(); + if (from.IsInclusive()) { + SetTypedValue(request.mutable_key_range()->mutable_greater_or_equal(), from.GetValue()); + } else { + SetTypedValue(request.mutable_key_range()->mutable_greater(), from.GetValue()); + } + } + + if (settings.To_) { + const auto& to = settings.To_.GetRef(); + if (to.IsInclusive()) { + SetTypedValue(request.mutable_key_range()->mutable_less_or_equal(), to.GetValue()); + } else { + SetTypedValue(request.mutable_key_range()->mutable_less(), to.GetValue()); + } + } + + auto promise = NewPromise<std::pair<TPlainStatus, TReadTableStreamProcessorPtr>>(); + + Connections_->StartReadStream<Ydb::Table::V1::TableService, Ydb::Table::ReadTableRequest, Ydb::Table::ReadTableResponse>( + std::move(request), + [promise] (TPlainStatus status, TReadTableStreamProcessorPtr processor) mutable { + promise.SetValue(std::make_pair(status, processor)); + }, + &Ydb::Table::V1::TableService::Stub::AsyncStreamReadTable, + DbDriverState_, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); + +} + +TAsyncReadRowsResult TTableClient::TImpl::ReadRows(const TString& path, TValue&& keys, const TReadRowsSettings& settings) { + auto request = MakeRequest<Ydb::Table::ReadRowsRequest>(); + request.set_path(path); + auto* protoKeys = request.mutable_keys(); + *protoKeys->mutable_type() = TProtoAccessor::GetProto(keys.GetType()); + *protoKeys->mutable_value() = TProtoAccessor::GetProto(keys); + + auto promise = NewPromise<TReadRowsResult>(); + + auto responseCb = [promise] + (Ydb::Table::ReadRowsResponse* response, TPlainStatus status) mutable { + Y_VERIFY(response); + TResultSet resultSet = TResultSet(response->result_set()); + TReadRowsResult val(TStatus(std::move(status)), std::move(resultSet)); + promise.SetValue(std::move(val)); + }; + + Connections_->Run<Ydb::Table::V1::TableService, Ydb::Table::ReadRowsRequest, Ydb::Table::ReadRowsResponse>( + std::move(request), + responseCb, + &Ydb::Table::V1::TableService::Stub::AsyncReadRows, + DbDriverState_, + TRpcRequestSettings::Make(settings), // requestSettings + TEndpointKey() // preferredEndpoint + ); + + return promise.GetFuture(); +} + +TAsyncStatus TTableClient::TImpl::Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings) { + auto request = MakeOperationRequest<Ydb::Table::DeleteSessionRequest>(settings); + request.set_session_id(sessionImpl->GetId()); + return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse>( + std::move(request), + &Ydb::Table::V1::TableService::Stub::AsyncDeleteSession, + TRpcRequestSettings::Make(settings), + sessionImpl->GetEndpointKey()); +} + +TAsyncStatus TTableClient::TImpl::CloseInternal(const TSession::TImpl* sessionImpl) { + static const auto internalCloseSessionSettings = TCloseSessionSettings() + .ClientTimeout(TDuration::Seconds(2)); + + auto driver = Connections_; + return Close(sessionImpl, internalCloseSessionSettings) + .Apply([driver{std::move(driver)}](TAsyncStatus status) mutable + { + driver.reset(); + return status; + }); +} + +bool TTableClient::TImpl::ReturnSession(TSession::TImpl* sessionImpl) { + Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE || + sessionImpl->GetState() == TSession::TImpl::S_IDLE); + + if (RequestMigrator_.DoCheckAndMigrate(sessionImpl)) { + SessionRemovedDueBalancing.Inc(); + return false; + } + + bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter(); + // Also removes NeedUpdateActiveCounter flag + sessionImpl->MarkIdle(); + sessionImpl->SetTimeInterval(TDuration::Zero()); + if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter, shared_from_this())) { + sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); + return false; + } + return true; +} + +void TTableClient::TImpl::DeleteSession(TSession::TImpl* sessionImpl) { + // Closing not owned by session pool session should not fire getting new session + if (sessionImpl->IsOwnedBySessionPool()) { + if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this())) { + // We requested new session for waiter which already incremented + // active session counter and old session will be deleted + // - skip update active counter in this case + sessionImpl->SetNeedUpdateActiveCounter(false); + } + } + + if (sessionImpl->NeedUpdateActiveCounter()) { + SessionPool_.DecrementActiveCounter(); + } + + if (sessionImpl->GetId()) { + CloseInternal(sessionImpl); + DbDriverState_->StatCollector.DecSessionsOnHost(sessionImpl->GetEndpoint()); + } + + delete sessionImpl; +} + +ui32 TTableClient::TImpl::GetSessionRetryLimit() const { + return Settings_.SessionPoolSettings_.RetryLimit_; +} + +void TTableClient::TImpl::SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector) { + CacheMissCounter.Set(collector.CacheMiss); + QuerySizeHistogram.Set(collector.QuerySize); + ParamsSizeHistogram.Set(collector.ParamsSize); + RetryOperationStatCollector = collector.RetryOperationStatCollector; + SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing); + RequestMigrated.Set(collector.RequestMigrated); +} + +TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const TString& table, TValue&& rows, const TBulkUpsertSettings& settings) { + auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings); + request.set_table(table); + *request.mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType()); + *request.mutable_rows()->mutable_value() = std::move(rows.GetProto()); + + auto promise = NewPromise<TBulkUpsertResult>(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Y_UNUSED(any); + TBulkUpsertResult val(TStatus(std::move(status))); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); +} + +TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const TString& table, EDataFormat format, + const TString& data, const TString& schema, const TBulkUpsertSettings& settings) +{ + auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings); + request.set_table(table); + if (format == EDataFormat::ApacheArrow) { + request.mutable_arrow_batch_settings()->set_schema(schema); + } else if (format == EDataFormat::CSV) { + auto* csv_settings = request.mutable_csv_settings(); + const auto& format_settings = settings.FormatSettings_; + if (!format_settings.empty()) { + bool ok = csv_settings->ParseFromString(format_settings); + if (!ok) { + return {}; + } + } + } + request.set_data(data); + + auto promise = NewPromise<TBulkUpsertResult>(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Y_UNUSED(any); + TBulkUpsertResult val(TStatus(std::move(status))); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); +} + +TFuture<std::pair<TPlainStatus, TTableClient::TImpl::TScanQueryProcessorPtr>> TTableClient::TImpl::StreamExecuteScanQueryInternal(const TString& query, + const ::google::protobuf::Map<TString, Ydb::TypedValue>* params, + const TStreamExecScanQuerySettings& settings) +{ + auto request = MakeRequest<Ydb::Table::ExecuteScanQueryRequest>(); + request.mutable_query()->set_yql_text(query); + + if (params) { + *request.mutable_parameters() = *params; + } + + if (settings.Explain_) { + request.set_mode(Ydb::Table::ExecuteScanQueryRequest::MODE_EXPLAIN); + } else { + request.set_mode(Ydb::Table::ExecuteScanQueryRequest::MODE_EXEC); + } + + request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); + + auto promise = NewPromise<std::pair<TPlainStatus, TScanQueryProcessorPtr>>(); + + Connections_->StartReadStream< + Ydb::Table::V1::TableService, + Ydb::Table::ExecuteScanQueryRequest, + Ydb::Table::ExecuteScanQueryPartialResponse> + ( + std::move(request), + [promise] (TPlainStatus status, TScanQueryProcessorPtr processor) mutable { + promise.SetValue(std::make_pair(status, processor)); + }, + &Ydb::Table::V1::TableService::Stub::AsyncStreamExecuteScanQuery, + DbDriverState_, + TRpcRequestSettings::Make(settings) + ); + + return promise.GetFuture(); +} + +TAsyncScanQueryPartIterator TTableClient::TImpl::StreamExecuteScanQuery(const TString& query, + const ::google::protobuf::Map<TString, Ydb::TypedValue>* params, + const TStreamExecScanQuerySettings& settings) +{ + auto promise = NewPromise<TScanQueryPartIterator>(); + + auto iteratorCallback = [promise](TFuture<std::pair<TPlainStatus, + TTableClient::TImpl::TScanQueryProcessorPtr>> future) mutable + { + Y_ASSERT(future.HasValue()); + auto pair = future.ExtractValue(); + promise.SetValue(TScanQueryPartIterator( + pair.second + ? std::make_shared<TScanQueryPartIterator::TReaderImpl>(pair.second, pair.first.Endpoint) + : nullptr, + std::move(pair.first)) + ); + }; + + StreamExecuteScanQueryInternal(query, params, settings).Subscribe(iteratorCallback); + return promise.GetFuture(); +} + +// void TTableClient::TImpl::CloseAndDeleteSession( +// std::unique_ptr<TSession::TImpl>&& impl, +// std::shared_ptr<TTableClient::TImpl> client); + + +void TTableClient::TImpl::SetParams( + ::google::protobuf::Map<TString, Ydb::TypedValue>* params, + Ydb::Table::ExecuteDataQueryRequest* request) +{ + if (params) { + request->mutable_parameters()->swap(*params); + } +} + +void TTableClient::TImpl::SetParams( + const ::google::protobuf::Map<TString, Ydb::TypedValue>& params, + Ydb::Table::ExecuteDataQueryRequest* request) +{ + *request->mutable_parameters() = params; +} + +void TTableClient::TImpl::CollectParams( + ::google::protobuf::Map<TString, Ydb::TypedValue>* params, + NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram) +{ + + if (params && histgoram.IsCollecting()) { + size_t size = 0; + for (auto& keyvalue: *params) { + size += keyvalue.second.ByteSizeLong(); + } + histgoram.Record(size); + } +} + +void TTableClient::TImpl::CollectParams( + const ::google::protobuf::Map<TString, Ydb::TypedValue>& params, + NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram) +{ + + if (histgoram.IsCollecting()) { + size_t size = 0; + for (auto& keyvalue: params) { + size += keyvalue.second.ByteSizeLong(); + } + histgoram.Record(size); + } +} + +void TTableClient::TImpl::CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>& querySizeHistogram) { + if (querySizeHistogram.IsCollecting()) { + querySizeHistogram.Record(query.size()); + } +} + +void TTableClient::TImpl::CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&) {} + +void TTableClient::TImpl::SetTxSettings(const TTxSettings& txSettings, Ydb::Table::TransactionSettings* proto) +{ + switch (txSettings.Mode_) { + case TTxSettings::TS_SERIALIZABLE_RW: + proto->mutable_serializable_read_write(); + break; + case TTxSettings::TS_ONLINE_RO: + proto->mutable_online_read_only()->set_allow_inconsistent_reads( + txSettings.OnlineSettings_.AllowInconsistentReads_); + break; + case TTxSettings::TS_STALE_RO: + proto->mutable_stale_read_only(); + break; + case TTxSettings::TS_SNAPSHOT_RO: + proto->mutable_snapshot_read_only(); + break; + default: + throw TContractViolation("Unexpected transaction mode."); + } +} + +void TTableClient::TImpl::SetQuery(const TString& queryText, Ydb::Table::Query* query) { + query->set_yql_text(queryText); +} + +void TTableClient::TImpl::SetQuery(const TDataQuery& queryData, Ydb::Table::Query* query) { + query->set_id(queryData.GetId()); +} + +void TTableClient::TImpl::SetQueryCachePolicy(const TString&, const TExecDataQuerySettings& settings, + Ydb::Table::QueryCachePolicy* queryCachePolicy) +{ + queryCachePolicy->set_keep_in_cache(settings.KeepInQueryCache_ ? settings.KeepInQueryCache_.GetRef() : false); +} + +void TTableClient::TImpl::SetQueryCachePolicy(const TDataQuery&, const TExecDataQuerySettings& settings, + Ydb::Table::QueryCachePolicy* queryCachePolicy) { + queryCachePolicy->set_keep_in_cache(settings.KeepInQueryCache_ ? settings.KeepInQueryCache_.GetRef() : true); +} + +TMaybe<TString> TTableClient::TImpl::GetQueryText(const TString& queryText) { + return queryText; +} + +TMaybe<TString> TTableClient::TImpl::GetQueryText(const TDataQuery& queryData) { + return queryData.GetText(); +} + +} +} diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h new file mode 100644 index 0000000000..5c157465b1 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h @@ -0,0 +1,369 @@ +#pragma once + +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/scheme_helpers/helpers.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/table_helpers/helpers.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> +#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> +#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> + +#include <util/random/random.h> + +#include "client_session.h" +#include "data_query.h" +#include "request_migrator.h" +#include "readers.h" +#include "session_pool.h" + + +namespace NYdb { +namespace NTable { + +using namespace NThreading; + + +//How often run session pool keep alive check +constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5); +//How ofter run host scan to perform session balancing +constexpr TDuration HOSTSCAN_PERIODIC_ACTION_INTERVAL = TDuration::Seconds(2); +constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4; +constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds(); +constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5); + + +TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings); +TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings); +ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber); +bool IsSessionCloseRequested(const TStatus& status); +TDuration RandomizeThreshold(TDuration duration); +TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings); +TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings); +TStatus GetStatus(const TOperation& operation); +TStatus GetStatus(const TStatus& status); + + +template<typename TResponse> +NThreading::TFuture<TResponse> InjectSessionStatusInterception( + std::shared_ptr<TSession::TImpl>& impl, NThreading::TFuture<TResponse> asyncResponse, + bool updateTimeout, + TDuration timeout, + std::function<void(const TResponse&, TSession::TImpl&)> cb = {}) +{ + auto promise = NThreading::NewPromise<TResponse>(); + asyncResponse.Subscribe([impl, promise, cb, updateTimeout, timeout](NThreading::TFuture<TResponse> future) mutable { + Y_VERIFY(future.HasValue()); + + // TResponse can hold refcounted user provided data (TSession for example) + // and we do not want to have copy of it (for example it can cause delay dtor call) + // so using move semantic here is mandatory. + // Also we must reset captured shared pointer to session impl + TResponse value = std::move(future.ExtractValue()); + + const TStatus& status = GetStatus(value); + // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect + // since we have guarantee this request wasn't been started to execute. + + if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { + impl->MarkBroken(); + } else if (status.GetStatus() == EStatus::SESSION_BUSY) { + impl->MarkBroken(); + } else if (status.GetStatus() == EStatus::BAD_SESSION) { + impl->MarkBroken(); + } else if (IsSessionCloseRequested(status)) { + impl->MarkAsClosing(); + } else { + // NOTE: About GetState and lock + // Simultanious call multiple requests on the same session make no sence, due to server limitation. + // But user can perform this call, right now we do not protect session from this, it may cause + // raise on session state if respoise is not success. + // It should not be a problem - in case of this race we close session + // or put it in to settler. + if (updateTimeout && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { + impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TSession::TImpl::EState::S_ACTIVE); + } + } + if (cb) { + cb(value, *impl); + } + impl.reset(); + promise.SetValue(std::move(value)); + }); + return promise.GetFuture(); +} + + +class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public IMigratorClient { +public: + using TReadTableStreamProcessorPtr = TTablePartIterator::TReaderImpl::TStreamProcessorPtr; + using TScanQueryProcessorPtr = TScanQueryPartIterator::TReaderImpl::TStreamProcessorPtr; + + TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings); + ~TImpl(); + + bool LinkObjToEndpoint(const TEndpointKey& endpoint, TEndpointObj* obj, const void* tag); + void InitStopper(); + NThreading::TFuture<void> Drain(); + NThreading::TFuture<void> Stop(); + void ScheduleTask(const std::function<void()>& fn, TDuration timeout); + void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout); + void AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn); + void StartPeriodicSessionPoolTask(); + static ui64 ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client); + static std::pair<ui64, size_t> ScanLocation(std::shared_ptr<TTableClient::TImpl> client, + std::unordered_map<ui64, size_t>& sessions, bool allNodes); + static NMath::TStats CalcCV(const std::unordered_map<ui64, size_t>& in); + void StartPeriodicHostScanTask(); + + TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings); + i64 GetActiveSessionCount() const; + i64 GetActiveSessionsLimit() const; + i64 GetCurrentPoolSize() const; + TAsyncCreateSessionResult CreateSession(const TCreateSessionSettings& settings, bool standalone, + TString preferedLocation = TString()); + TAsyncKeepAliveResult KeepAlive(const TSession::TImpl* session, const TKeepAliveSettings& settings); + + TFuture<TStatus> CreateTable(Ydb::Table::CreateTableRequest&& request, const TCreateTableSettings& settings); + TFuture<TStatus> AlterTable(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings); + TAsyncOperation AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings); + TFuture<TStatus> CopyTable(const TString& sessionId, const TString& src, const TString& dst, + const TCopyTableSettings& settings); + TFuture<TStatus> CopyTables(Ydb::Table::CopyTablesRequest&& request, const TCopyTablesSettings& settings); + TFuture<TStatus> RenameTables(Ydb::Table::RenameTablesRequest&& request, const TRenameTablesSettings& settings); + TFuture<TStatus> DropTable(const TString& sessionId, const TString& path, const TDropTableSettings& settings); + TAsyncDescribeTableResult DescribeTable(const TString& sessionId, const TString& path, const TDescribeTableSettings& settings); + + template<typename TParamsType> + TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const TString& query, const TTxControl& txControl, + TParamsType params, const TExecDataQuerySettings& settings) { + auto maybeQuery = session.SessionImpl_->GetQueryFromCache(query, Settings_.AllowRequestMigration_); + if (maybeQuery) { + TDataQuery dataQuery(session, query, maybeQuery->QueryId, maybeQuery->ParameterTypes); + return ExecuteDataQuery(session, dataQuery, txControl, params, settings, true); + } + + CacheMissCounter.Inc(); + + return InjectSessionStatusInterception(session.SessionImpl_, + ExecuteDataQueryInternal(session, query, txControl, params, settings, false), + true, GetMinTimeToTouch(Settings_.SessionPoolSettings_)); + } + + template<typename TParamsType> + TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const TDataQuery& dataQuery, const TTxControl& txControl, + TParamsType params, const TExecDataQuerySettings& settings, + bool fromCache) { + TString queryKey = dataQuery.Impl_->GetTextHash(); + auto cb = [queryKey](const TDataQueryResult& result, TSession::TImpl& session) { + if (result.GetStatus() == EStatus::NOT_FOUND) { + session.InvalidateQueryInCache(queryKey); + } + }; + + return InjectSessionStatusInterception<TDataQueryResult>( + session.SessionImpl_, + session.Client_->ExecuteDataQueryInternal(session, dataQuery, txControl, params, settings, fromCache), + true, + GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_), + cb); + } + + TAsyncPrepareQueryResult PrepareDataQuery(const TSession& session, const TString& query, + const TPrepareDataQuerySettings& settings); + TAsyncStatus ExecuteSchemeQuery(const TString& sessionId, const TString& query, + const TExecSchemeQuerySettings& settings); + + TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings, + const TBeginTxSettings& settings); + TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx, + const TCommitTxSettings& settings); + TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx, + const TRollbackTxSettings& settings); + + TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const TString& query, + const TExplainDataQuerySettings& settings); + + static void SetTypedValue(Ydb::TypedValue* protoValue, const TValue& value); + + NThreading::TFuture<std::pair<TPlainStatus, TReadTableStreamProcessorPtr>> ReadTable( + const TString& sessionId, + const TString& path, + const TReadTableSettings& settings); + TAsyncReadRowsResult ReadRows(const TString& path, TValue&& keys, const TReadRowsSettings& settings); + + TAsyncStatus Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings); + TAsyncStatus CloseInternal(const TSession::TImpl* sessionImpl); + + bool ReturnSession(TSession::TImpl* sessionImpl); + void DeleteSession(TSession::TImpl* sessionImpl); + ui32 GetSessionRetryLimit() const; + static void CloseAndDeleteSession( + std::unique_ptr<TSession::TImpl>&& impl, + std::shared_ptr<TTableClient::TImpl> client); + + + void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector); + + TAsyncBulkUpsertResult BulkUpsert(const TString& table, TValue&& rows, const TBulkUpsertSettings& settings); + TAsyncBulkUpsertResult BulkUpsert(const TString& table, EDataFormat format, + const TString& data, const TString& schema, const TBulkUpsertSettings& settings); + + TFuture<std::pair<TPlainStatus, TScanQueryProcessorPtr>> StreamExecuteScanQueryInternal(const TString& query, + const ::google::protobuf::Map<TString, Ydb::TypedValue>* params, + const TStreamExecScanQuerySettings& settings); + TAsyncScanQueryPartIterator StreamExecuteScanQuery(const TString& query, + const ::google::protobuf::Map<TString, Ydb::TypedValue>* params, + const TStreamExecScanQuerySettings& settings); +public: + TClientSettings Settings_; + +private: + static void SetParams( + ::google::protobuf::Map<TString, Ydb::TypedValue>* params, + Ydb::Table::ExecuteDataQueryRequest* request); + + static void SetParams( + const ::google::protobuf::Map<TString, Ydb::TypedValue>& params, + Ydb::Table::ExecuteDataQueryRequest* request); + + static void CollectParams( + ::google::protobuf::Map<TString, Ydb::TypedValue>* params, + NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram); + + static void CollectParams( + const ::google::protobuf::Map<TString, Ydb::TypedValue>& params, + NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram); + + static void CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>& querySizeHistogram); + + static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&); + + template <typename TQueryType, typename TParamsType> + TAsyncDataQueryResult ExecuteDataQueryInternal(const TSession& session, const TQueryType& query, + const TTxControl& txControl, TParamsType params, + const TExecDataQuerySettings& settings, bool fromCache + ) { + auto request = MakeOperationRequest<Ydb::Table::ExecuteDataQueryRequest>(settings); + request.set_session_id(session.GetId()); + auto txControlProto = request.mutable_tx_control(); + txControlProto->set_commit_tx(txControl.CommitTx_); + if (txControl.TxId_) { + txControlProto->set_tx_id(*txControl.TxId_); + } else { + SetTxSettings(txControl.BeginTx_, txControlProto->mutable_begin_tx()); + } + + request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); + + SetQuery(query, request.mutable_query()); + CollectQuerySize(query, QuerySizeHistogram); + + SetParams(params, &request); + CollectParams(params, ParamsSizeHistogram); + + SetQueryCachePolicy(query, settings, request.mutable_query_cache_policy()); + + auto promise = NewPromise<TDataQueryResult>(); + bool keepInCache = settings.KeepInQueryCache_ && settings.KeepInQueryCache_.GetRef(); + + // We don't want to delay call of TSession dtor, so we can't capture it by copy + // otherwise we break session pool and other clients logic. + // Same problem with TDataQuery and TTransaction + // + // The fast solution is: + // - create copy of TSession out of lambda + // - capture pointer + // - call free just before SetValue call + auto sessionPtr = new TSession(session); + auto extractor = [promise, sessionPtr, query, fromCache, keepInCache] + (google::protobuf::Any* any, TPlainStatus status) mutable { + TVector<TResultSet> res; + TMaybe<TTransaction> tx; + TMaybe<TDataQuery> dataQuery; + TMaybe<TQueryStats> queryStats; + + auto queryText = GetQueryText(query); + if (any) { + Ydb::Table::ExecuteQueryResult result; + any->UnpackTo(&result); + + for (size_t i = 0; i < result.result_setsSize(); i++) { + res.push_back(TResultSet(*result.mutable_result_sets(i))); + } + + if (result.has_tx_meta()) { + tx = TTransaction(*sessionPtr, result.tx_meta().id()); + } + + if (result.has_query_meta()) { + if (queryText) { + auto& query_meta = result.query_meta(); + dataQuery = TDataQuery(*sessionPtr, *queryText, query_meta.id(), query_meta.parameters_types()); + } + } + + if (result.has_query_stats()) { + queryStats = TQueryStats(result.query_stats()); + } + } + + if (keepInCache && dataQuery && queryText) { + sessionPtr->SessionImpl_->AddQueryToCache(*dataQuery); + } + + TDataQueryResult dataQueryResult(TStatus(std::move(status)), + std::move(res), tx, dataQuery, fromCache, queryStats); + + delete sessionPtr; + tx.Clear(); + dataQuery.Clear(); + promise.SetValue(std::move(dataQueryResult)); + }; + + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse>( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncExecuteDataQuery, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + session.SessionImpl_->GetEndpointKey()); + + return promise.GetFuture(); + } + + static void SetTxSettings(const TTxSettings& txSettings, Ydb::Table::TransactionSettings* proto); + + static void SetQuery(const TString& queryText, Ydb::Table::Query* query); + + static void SetQuery(const TDataQuery& queryData, Ydb::Table::Query* query); + + static void SetQueryCachePolicy(const TString&, const TExecDataQuerySettings& settings, + Ydb::Table::QueryCachePolicy* queryCachePolicy); + + static void SetQueryCachePolicy(const TDataQuery&, const TExecDataQuerySettings& settings, + Ydb::Table::QueryCachePolicy* queryCachePolicy); + + static TMaybe<TString> GetQueryText(const TString& queryText); + + static TMaybe<TString> GetQueryText(const TDataQuery& queryData); + +public: + NSdkStats::TAtomicCounter<::NMonitoring::TRate> CacheMissCounter; + NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector; + NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram; + NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram; + NSdkStats::TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing; + NSdkStats::TAtomicCounter<::NMonitoring::TRate> RequestMigrated; + +private: + TSessionPool SessionPool_; + TRequestMigrator RequestMigrator_; + static const TKeepAliveSettings KeepAliveSettings; +}; + +} +} diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 976f8c8b5b..9ef4048e16 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -9,12 +9,12 @@ #include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> #include <ydb/public/api/protos/ydb_table.pb.h> #include <ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h> -#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_value/value.h> #include <ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h> #include <ydb/public/sdk/cpp/client/ydb_table/impl/data_query.h> #include <ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h> +#include <ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h> #include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> #include <google/protobuf/util/time_util.h> @@ -32,15 +32,6 @@ namespace NTable { using namespace NThreading; -//How often run session pool keep alive check -constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5); -//How ofter run host scan to perform session balancing -constexpr TDuration HOSTSCAN_PERIODIC_ACTION_INTERVAL = TDuration::Seconds(2); -constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4; -constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5); -constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval -constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds(); - //////////////////////////////////////////////////////////////////////////////// class TStorageSettings::TImpl { @@ -1234,64 +1225,6 @@ TTableDescription TTableBuilder::Build() { return TableDescription_; } -//////////////////////////////////////////////////////////////////////////////// - -class TTablePartIterator::TReaderImpl { -public: - using TSelf = TTablePartIterator::TReaderImpl; - using TResponse = Ydb::Table::ReadTableResponse; - using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr; - using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback; - using TGRpcStatus = NGrpc::TGrpcStatus; - using TBatchReadResult = std::pair<TResponse, TGRpcStatus>; - - TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint) - : StreamProcessor_(streamProcessor) - , Finished_(false) - , Endpoint_(endpoint) - {} - - ~TReaderImpl() { - StreamProcessor_->Cancel(); - } - - bool IsFinished() { - return Finished_; - } - - TAsyncSimpleStreamPart<TResultSet> ReadNext(std::shared_ptr<TSelf> self) { - auto promise = NThreading::NewPromise<TSimpleStreamPart<TResultSet>>(); - // Capture self - guarantee no dtor call during the read - auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable { - std::optional<TReadTableSnapshot> snapshot; - if (self->Response_.has_snapshot()) { - snapshot.emplace( - self->Response_.snapshot().plan_step(), - self->Response_.snapshot().tx_id()); - } - if (!grpcStatus.Ok()) { - self->Finished_ = true; - promise.SetValue({TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), - TStatus(TPlainStatus(grpcStatus, self->Endpoint_)), - snapshot}); - } else { - NYql::TIssues issues; - NYql::IssuesFromMessage(self->Response_.issues(), issues); - EStatus clientStatus = static_cast<EStatus>(self->Response_.status()); - promise.SetValue({TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), - TStatus(clientStatus, std::move(issues)), - snapshot}); - } - }; - StreamProcessor_->Read(&Response_, readCb); - return promise.GetFuture(); - } -private: - TStreamProcessorPtr StreamProcessor_; - TResponse Response_; - bool Finished_; - TString Endpoint_; -}; TTablePartIterator::TTablePartIterator( std::shared_ptr<TReaderImpl> impl, @@ -1306,69 +1239,6 @@ TAsyncSimpleStreamPart<TResultSet> TTablePartIterator::ReadNext() { return ReaderImpl_->ReadNext(ReaderImpl_); } -//////////////////////////////////////////////////////////////////////////////// - -class TScanQueryPartIterator::TReaderImpl { -public: - using TSelf = TScanQueryPartIterator::TReaderImpl; - using TResponse = Ydb::Table::ExecuteScanQueryPartialResponse; - using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr; - using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback; - using TGRpcStatus = NGrpc::TGrpcStatus; - using TBatchReadResult = std::pair<TResponse, TGRpcStatus>; - - TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint) - : StreamProcessor_(streamProcessor) - , Finished_(false) - , Endpoint_(endpoint) - {} - - ~TReaderImpl() { - StreamProcessor_->Cancel(); - } - - bool IsFinished() const { - return Finished_; - } - - TAsyncScanQueryPart ReadNext(std::shared_ptr<TSelf> self) { - auto promise = NThreading::NewPromise<TScanQueryPart>(); - // Capture self - guarantee no dtor call during the read - auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable { - if (!grpcStatus.Ok()) { - self->Finished_ = true; - promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_))}); - } else { - NYql::TIssues issues; - NYql::IssuesFromMessage(self->Response_.issues(), issues); - EStatus clientStatus = static_cast<EStatus>(self->Response_.status()); - // TODO: Add headers for streaming calls. - TPlainStatus plainStatus{clientStatus, std::move(issues), self->Endpoint_, {}}; - TStatus status{std::move(plainStatus)}; - TMaybe<TQueryStats> queryStats; - - if (self->Response_.result().has_query_stats()) { - queryStats = TQueryStats(self->Response_.result().query_stats()); - } - - if (self->Response_.result().has_result_set()) { - promise.SetValue({std::move(status), - TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats}); - } else { - promise.SetValue({std::move(status), queryStats}); - } - } - }; - StreamProcessor_->Read(&Response_, readCb); - return promise.GetFuture(); - } -private: - TStreamProcessorPtr StreamProcessor_; - TResponse Response_; - bool Finished_; - TString Endpoint_; -}; - TScanQueryPartIterator::TScanQueryPartIterator( std::shared_ptr<TReaderImpl> impl, TPlainStatus&& status) @@ -1382,1589 +1252,7 @@ TAsyncScanQueryPart TScanQueryPartIterator::ReadNext() { return ReaderImpl_->ReadNext(ReaderImpl_); } -//////////////////////////////////////////////////////////////////////////////// - -class TSessionPoolImpl { - typedef TAsyncCreateSessionResult - (*TAwareSessonProvider) - (std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings); -public: - using TKeepAliveCmd = std::function<void(TSession session)>; - using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>; - TSessionPoolImpl(ui32 maxActiveSessions); - // TAwareSessonProvider: - // function is called if session pool is empty, - // this is used for additional total session count limitation - TAsyncCreateSessionResult GetSession( - std::shared_ptr<TTableClient::TImpl> client, - const TCreateSessionSettings& settings); - // Returns true if session was extracted from session pool and dropped via smart deleter - // Returns false if session for given endpoint was not found - // NOTE: O(n) under session pool lock, should not be used often - bool DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, ui64 nodeId); - // Returns true if session returned to pool successfully - bool ReturnSession(TSession::TImpl* impl, bool active); - TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); - i64 GetActiveSessions() const; - i64 GetActiveSessionsLimit() const; - i64 GetCurrentPoolSize() const; - void DecrementActiveCounter(); - - void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close); - void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector); - - static void CreateFakeSession(NThreading::TPromise<TCreateSessionResult>& promise, - std::shared_ptr<TTableClient::TImpl> client); -private: - void UpdateStats(); - - mutable std::mutex Mtx_; - TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_; - bool Closed_; - i64 ActiveSessions_; - const ui32 MaxActiveSessions_; - NSdkStats::TSessionCounter ActiveSessionsCounter_; - NSdkStats::TSessionCounter InPoolSessionsCounter_; - NSdkStats::TAtomicCounter<::NMonitoring::TRate> FakeSessionsCounter_; -}; - -static TDuration RandomizeThreshold(TDuration duration) { - TDuration::TValue value = duration.GetValue(); - if (KEEP_ALIVE_RANDOM_FRACTION) { - const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION; - if (randomLimit < 2) - return duration; - value += static_cast<i64>(RandomNumber<ui64>(randomLimit)); - } - return TDuration::FromValue(value); -} - -static TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings) { - return Min(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_); -} - -static TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings) { - return Max(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_); -} - -static TStatus GetStatus(const TOperation& operation) { - return operation.Status(); -} - -static TStatus GetStatus(const TStatus& status) { - return status; -} - -static bool IsSessionCloseRequested(const TStatus& status) { - const auto& meta = status.GetResponseMetadata(); - auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS); - for(auto it = hints.first; it != hints.second; ++it) { - if (it->second == NYdb::YDB_SESSION_CLOSE) { - return true; - } - } - - return false; -} - -template<typename TResponse> -NThreading::TFuture<TResponse> InjectSessionStatusInterception( - std::shared_ptr<TSession::TImpl>& impl, NThreading::TFuture<TResponse> asyncResponse, - bool updateTimeout, - TDuration timeout, - std::function<void(const TResponse&, TSession::TImpl&)> cb = {}) -{ - auto promise = NThreading::NewPromise<TResponse>(); - asyncResponse.Subscribe([impl, promise, cb, updateTimeout, timeout](NThreading::TFuture<TResponse> future) mutable { - Y_VERIFY(future.HasValue()); - - // TResponse can hold refcounted user provided data (TSession for example) - // and we do not want to have copy of it (for example it can cause delay dtor call) - // so using move semantic here is mandatory. - // Also we must reset captured shared pointer to session impl - TResponse value = std::move(future.ExtractValue()); - - const TStatus& status = GetStatus(value); - // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect - // since we have guarantee this request wasn't been started to execute. - - if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { - impl->MarkBroken(); - } else if (status.GetStatus() == EStatus::SESSION_BUSY) { - impl->MarkBroken(); - } else if (status.GetStatus() == EStatus::BAD_SESSION) { - impl->MarkBroken(); - } else if (IsSessionCloseRequested(status)) { - impl->MarkAsClosing(); - } else { - // NOTE: About GetState and lock - // Simultanious call multiple requests on the same session make no sence, due to server limitation. - // But user can perform this call, right now we do not protect session from this, it may cause - // raise on session state if respoise is not success. - // It should not be a problem - in case of this race we close session - // or put it in to settler. - if (updateTimeout && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { - impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TSession::TImpl::EState::S_ACTIVE); - } - } - if (cb) { - cb(value, *impl); - } - impl.reset(); - promise.SetValue(std::move(value)); - }); - return promise.GetFuture(); -} - -static ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber) { - ui32 backoffSlots = 1 << std::min(retryNumber, settings.Ceiling_); - TDuration maxDuration = settings.SlotDuration_ * backoffSlots; - - double uncertaintyRatio = std::max(std::min(settings.UncertainRatio_, 1.0), 0.0); - double uncertaintyMultiplier = RandomNumber<double>() * uncertaintyRatio - uncertaintyRatio + 1.0; - - double durationMs = round(maxDuration.MilliSeconds() * uncertaintyMultiplier); - - return std::max(std::min(durationMs, (double)MAX_BACKOFF_DURATION_MS), 0.0); -} - -//////////////////////////////////////////////////////////////////////////////// - -class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public IMigratorClient { -public: - using TReadTableStreamProcessorPtr = TTablePartIterator::TReaderImpl::TStreamProcessorPtr; - using TScanQueryProcessorPtr = TScanQueryPartIterator::TReaderImpl::TStreamProcessorPtr; - - TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings) - : TClientImplCommon(std::move(connections), settings) - , Settings_(settings) - , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_) - { - if (!DbDriverState_->StatCollector.IsCollecting()) { - return; - } - - SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector()); - SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector()); - } - - ~TImpl() { - if (Connections_->GetDrainOnDtors()) { - Drain().Wait(); - } - } - - bool LinkObjToEndpoint(const TEndpointKey& endpoint, TEndpointObj* obj, const void* tag) { - return DbDriverState_->EndpointPool.LinkObjToEndpoint(endpoint, obj, tag); - } - - void InitStopper() { - std::weak_ptr<TTableClient::TImpl> weak = shared_from_this(); - auto cb = [weak]() mutable { - auto strong = weak.lock(); - if (!strong) { - auto promise = NThreading::NewPromise<void>(); - promise.SetException("no more client"); - return promise.GetFuture(); - } - return strong->Drain(); - }; - - DbDriverState_->AddCb(std::move(cb), TDbDriverState::ENotifyType::STOP); - } - - NThreading::TFuture<void> Drain() { - TVector<std::unique_ptr<TSession::TImpl>> sessions; - // No realocations under lock - sessions.reserve(Settings_.SessionPoolSettings_.MaxActiveSessions_); - auto drainer = [&sessions](std::unique_ptr<TSession::TImpl>&& impl) mutable { - sessions.push_back(std::move(impl)); - return true; - }; - SessionPool_.Drain(drainer, true); - TVector<TAsyncStatus> closeResults; - for (auto& s : sessions) { - if (s->GetId()) { - closeResults.push_back(CloseInternal(s.get())); - } - } - sessions.clear(); - return NThreading::WaitExceptionOrAll(closeResults); - } - - NThreading::TFuture<void> Stop() { - return Drain(); - } - - void ScheduleTask(const std::function<void()>& fn, TDuration timeout) { - std::weak_ptr<TTableClient::TImpl> weak = shared_from_this(); - auto cbGuard = [weak, fn]() { - auto strongClient = weak.lock(); - if (strongClient) { - fn(); - } - }; - Connections_->ScheduleOneTimeTask(std::move(cbGuard), timeout); - } - - void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) { - Connections_->ScheduleOneTimeTask(std::move(fn), timeout); - } - - void AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn) { - auto durationMs = CalcBackoffTime(settings, retryNumber); - ScheduleTask(fn, TDuration::MilliSeconds(durationMs)); - } - - void StartPeriodicSessionPoolTask() { - - auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) { - - const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_; - const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast(); - - if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) { - if (sessionsCount > sessionPoolSettings.MinPoolSize_) { - return true; - } - } - - return false; - }; - - auto keepAliveCmd = [](TSession session) { - Y_VERIFY(session.GetId()); - - const auto sessionPoolSettings = session.Client_->Settings_.SessionPoolSettings_; - const auto spentTime = session.SessionImpl_->GetTimeToTouchFast() - session.SessionImpl_->GetTimeInPastFast(); - - const auto maxTimeToTouch = GetMaxTimeToTouch(session.Client_->Settings_.SessionPoolSettings_); - const auto minTimeToTouch = GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_); - - auto calcTimeToNextTouch = [maxTimeToTouch, minTimeToTouch] (const TDuration spent) { - auto timeToNextTouch = minTimeToTouch; - if (maxTimeToTouch > spent) { - auto t = maxTimeToTouch - spent; - timeToNextTouch = Min(t, minTimeToTouch); - } - return timeToNextTouch; - }; - - if (spentTime >= sessionPoolSettings.KeepAliveIdleThreshold_) { - - // Handle of session status will be done inside InjectSessionStatusInterception routine. - // We just need to reschedule time to next call because InjectSessionStatusInterception doesn't - // update timeInPast for calls from internal keep alive routine - session.KeepAlive(KeepAliveSettings) - .Subscribe([spentTime, session, maxTimeToTouch, calcTimeToNextTouch](TAsyncKeepAliveResult asyncResult) { - if (!asyncResult.GetValue().IsSuccess()) - return; - - if (spentTime >= maxTimeToTouch) { - auto timeToNextTouch = calcTimeToNextTouch(spentTime); - session.SessionImpl_->ScheduleTimeToTouchFast(timeToNextTouch, true); - } - }); - return; - } - - auto timeToNextTouch = calcTimeToNextTouch(spentTime); - session.SessionImpl_->ScheduleTimeToTouchFast( - RandomizeThreshold(timeToNextTouch), - spentTime >= maxTimeToTouch - ); - }; - - std::weak_ptr<TTableClient::TImpl> weak = shared_from_this(); - Connections_->AddPeriodicTask( - SessionPool_.CreatePeriodicTask( - weak, - std::move(keepAliveCmd), - std::move(deletePredicate) - ), PERIODIC_ACTION_INTERVAL); - } - - static ui64 ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client) { - size_t max = 0; - ui64 result = 0; - - auto cb = [&result, &max](ui64 nodeId, const IObjRegistryHandle& handle) { - const auto sz = handle.Size(); - if (sz > max) { - result = nodeId; - max = sz; - } - }; - - client->DbDriverState_->ForEachForeignEndpoint(cb, client.get()); - - return result; - } - - static std::pair<ui64, size_t> ScanLocation(std::shared_ptr<TTableClient::TImpl> client, - std::unordered_map<ui64, size_t>& sessions, bool allNodes) - { - std::pair<ui64, size_t> result = {0, 0}; - - auto cb = [&result, &sessions](ui64 nodeId, const IObjRegistryHandle& handle) { - const auto sz = handle.Size(); - sessions.insert({nodeId, sz}); - if (sz > result.second) { - result.first = nodeId; - result.second = sz; - } - }; - - if (allNodes) { - client->DbDriverState_->ForEachEndpoint(cb, client.get()); - } else { - client->DbDriverState_->ForEachLocalEndpoint(cb, client.get()); - } - - return result; - } - - static NMath::TStats CalcCV(const std::unordered_map<ui64, size_t>& in) { - TVector<size_t> t; - t.reserve(in.size()); - std::transform(in.begin(), in.end(), std::back_inserter(t), [](const std::pair<ui64, size_t>& pair) { - return pair.second; - }); - return NMath::CalcCV(t); - } - - void StartPeriodicHostScanTask() { - std::weak_ptr<TTableClient::TImpl> weak = shared_from_this(); - - // The future in completed when we have finished current migrate task - // and ready to accept new one - std::pair<ui64, size_t> winner = {0, 0}; - - auto periodicCb = [weak, winner](NYql::TIssues&&, EStatus status) mutable -> bool { - - if (status != EStatus::SUCCESS) { - return false; - } - - auto strongClient = weak.lock(); - if (!strongClient) { - return false; - } else { - TRequestMigrator& migrator = strongClient->RequestMigrator_; - - const auto balancingPolicy = strongClient->DbDriverState_->GetBalancingPolicy(); - - // Try to find any host at foreign locations if prefer local dc - const ui64 foreignHost = (balancingPolicy == EBalancingPolicy::UsePreferableLocation) ? - ScanForeignLocations(strongClient) : 0; - - std::unordered_map<ui64, size_t> hostMap; - - winner = ScanLocation(strongClient, hostMap, - balancingPolicy == EBalancingPolicy::UseAllNodes); - - bool forceMigrate = false; - - // There is host in foreign locations - if (foreignHost) { - // But no hosts at local - if (hostMap.empty()) { - Y_VERIFY(!winner.second); - // Scan whole cluster - we have no local dc - winner = ScanLocation(strongClient, hostMap, true); - } else { - // We have local and foreign hosts, so force migration to local one - forceMigrate = true; - // Just replace source - winner.first = foreignHost; - winner.second++; - } - } - - const auto minCv = strongClient->Settings_.MinSessionCV_; - - const auto stats = CalcCV(hostMap); - - strongClient->DbDriverState_->StatCollector.SetSessionCV(stats.Cv); - - // Just scan to update monitoring counter ^^ - // Balancing feature is disabled. - if (!minCv) - return true; - - if (hostMap.size() < 2) - return true; - - // Migrate last session only if move from foreign to local - if (!forceMigrate && winner.second < 2) - return true; - - if (stats.Cv > minCv || forceMigrate) { - migrator.SetHost(winner.first); - } else { - migrator.SetHost(0); - } - return true; - } - }; - - Connections_->AddPeriodicTask(std::move(periodicCb), HOSTSCAN_PERIODIC_ACTION_INTERVAL); - } - - TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings) { - return SessionPool_.GetSession(shared_from_this(), settings); - } - - i64 GetActiveSessionCount() const { - return SessionPool_.GetActiveSessions(); - } - - i64 GetActiveSessionsLimit() const { - return SessionPool_.GetActiveSessionsLimit(); - } - - i64 GetCurrentPoolSize() const { - return SessionPool_.GetCurrentPoolSize(); - } - - TAsyncCreateSessionResult CreateSession(const TCreateSessionSettings& settings, bool standalone, - TString preferedLocation = TString()) - { - auto request = MakeOperationRequest<Ydb::Table::CreateSessionRequest>(settings); - - auto createSessionPromise = NewPromise<TCreateSessionResult>(); - auto self = shared_from_this(); - auto rpcSettings = TRpcRequestSettings::Make(settings); - rpcSettings.Header.push_back({NYdb::YDB_CLIENT_CAPABILITIES, NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER}); - - auto createSessionExtractor = [createSessionPromise, self, standalone] - (google::protobuf::Any* any, TPlainStatus status) mutable { - Ydb::Table::CreateSessionResult result; - if (any) { - any->UnpackTo(&result); - } - auto session = TSession(self, result.session_id(), status.Endpoint); - if (status.Ok()) { - if (standalone) { - session.SessionImpl_->MarkStandalone(); - } else { - session.SessionImpl_->MarkActive(); - } - self->DbDriverState_->StatCollector.IncSessionsOnHost(status.Endpoint); - } else { - // We do not use SessionStatusInterception for CreateSession request - session.SessionImpl_->MarkBroken(); - } - TCreateSessionResult val(TStatus(std::move(status)), std::move(session)); - createSessionPromise.SetValue(std::move(val)); - }; - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse>( - std::move(request), - createSessionExtractor, - &Ydb::Table::V1::TableService::Stub::AsyncCreateSession, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - rpcSettings, - TEndpointKey(preferedLocation, 0)); - - std::weak_ptr<TDbDriverState> state = DbDriverState_; - - return createSessionPromise.GetFuture(); - } - - TAsyncKeepAliveResult KeepAlive(const TSession::TImpl* session, const TKeepAliveSettings& settings) { - auto request = MakeOperationRequest<Ydb::Table::KeepAliveRequest>(settings); - request.set_session_id(session->GetId()); - - auto keepAliveResultPromise = NewPromise<TKeepAliveResult>(); - auto self = shared_from_this(); - - auto keepAliveExtractor = [keepAliveResultPromise, self] - (google::protobuf::Any* any, TPlainStatus status) mutable { - Ydb::Table::KeepAliveResult result; - ESessionStatus sessionStatus = ESessionStatus::Unspecified; - if (any) { - any->UnpackTo(&result); - - switch (result.session_status()) { - case Ydb::Table::KeepAliveResult_SessionStatus_SESSION_STATUS_READY: - sessionStatus = ESessionStatus::Ready; - break; - case Ydb::Table::KeepAliveResult_SessionStatus_SESSION_STATUS_BUSY: - sessionStatus = ESessionStatus::Busy; - break; - default: - sessionStatus = ESessionStatus::Unspecified; - } - } - TKeepAliveResult val(TStatus(std::move(status)), sessionStatus); - keepAliveResultPromise.SetValue(std::move(val)); - }; - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::KeepAliveRequest, Ydb::Table::KeepAliveResponse>( - std::move(request), - keepAliveExtractor, - &Ydb::Table::V1::TableService::Stub::AsyncKeepAlive, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session->GetEndpointKey()); - - return keepAliveResultPromise.GetFuture(); - } - - TFuture<TStatus> CreateTable(Ydb::Table::CreateTableRequest&& request, const TCreateTableSettings& settings) - { - return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CreateTableRequest,Ydb::Table::CreateTableResponse>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncCreateTable, - TRpcRequestSettings::Make(settings)); - } - - TFuture<TStatus> AlterTable(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings) - { - return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::AlterTableRequest, Ydb::Table::AlterTableResponse>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncAlterTable, - TRpcRequestSettings::Make(settings)); - } - - TAsyncOperation AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings) - { - using Ydb::Table::V1::TableService; - using Ydb::Table::AlterTableRequest; - using Ydb::Table::AlterTableResponse; - return RunOperation<TableService, AlterTableRequest, AlterTableResponse, TOperation>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncAlterTable, - TRpcRequestSettings::Make(settings)); - } - - TFuture<TStatus> CopyTable(const TString& sessionId, const TString& src, const TString& dst, - const TCopyTableSettings& settings) - { - auto request = MakeOperationRequest<Ydb::Table::CopyTableRequest>(settings); - request.set_session_id(sessionId); - request.set_source_path(src); - request.set_destination_path(dst); - - return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CopyTableRequest, Ydb::Table::CopyTableResponse>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncCopyTable, - TRpcRequestSettings::Make(settings)); - } - - TFuture<TStatus> CopyTables(Ydb::Table::CopyTablesRequest&& request, const TCopyTablesSettings& settings) - { - return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CopyTablesRequest, Ydb::Table::CopyTablesResponse>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncCopyTables, - TRpcRequestSettings::Make(settings)); - } - - TFuture<TStatus> RenameTables(Ydb::Table::RenameTablesRequest&& request, const TRenameTablesSettings& settings) - { - return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RenameTablesRequest, Ydb::Table::RenameTablesResponse>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncRenameTables, - TRpcRequestSettings::Make(settings)); - } - - TFuture<TStatus> DropTable(const TString& sessionId, const TString& path, const TDropTableSettings& settings) { - auto request = MakeOperationRequest<Ydb::Table::DropTableRequest>(settings); - request.set_session_id(sessionId); - request.set_path(path); - - return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DropTableRequest, Ydb::Table::DropTableResponse>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncDropTable, - TRpcRequestSettings::Make(settings)); - } - - TAsyncDescribeTableResult DescribeTable(const TString& sessionId, const TString& path, const TDescribeTableSettings& settings) { - auto request = MakeOperationRequest<Ydb::Table::DescribeTableRequest>(settings); - request.set_session_id(sessionId); - request.set_path(path); - if (settings.WithKeyShardBoundary_) { - request.set_include_shard_key_bounds(true); - } - - if (settings.WithTableStatistics_) { - request.set_include_table_stats(true); - } - - if (settings.WithPartitionStatistics_) { - request.set_include_partition_stats(true); - } - - auto promise = NewPromise<TDescribeTableResult>(); - - auto extractor = [promise, settings] - (google::protobuf::Any* any, TPlainStatus status) mutable { - Ydb::Table::DescribeTableResult result; - if (any) { - any->UnpackTo(&result); - } - TDescribeTableResult describeTableResult(TStatus(std::move(status)), - std::move(result), settings); - promise.SetValue(std::move(describeTableResult)); - }; - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::DescribeTableRequest, Ydb::Table::DescribeTableResponse>( - std::move(request), - extractor, - &Ydb::Table::V1::TableService::Stub::AsyncDescribeTable, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings)); - - return promise.GetFuture(); - } - - template<typename TParamsType> - TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const TString& query, const TTxControl& txControl, - TParamsType params, const TExecDataQuerySettings& settings) - { - auto maybeQuery = session.SessionImpl_->GetQueryFromCache(query, Settings_.AllowRequestMigration_); - if (maybeQuery) { - TDataQuery dataQuery(session, query, maybeQuery->QueryId, maybeQuery->ParameterTypes); - return ExecuteDataQuery(session, dataQuery, txControl, params, settings, true); - } - - CacheMissCounter.Inc(); - - return InjectSessionStatusInterception(session.SessionImpl_, - ExecuteDataQueryInternal(session, query, txControl, params, settings, false), - true, GetMinTimeToTouch(Settings_.SessionPoolSettings_)); - } - - template<typename TParamsType> - TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const TDataQuery& dataQuery, const TTxControl& txControl, - TParamsType params, const TExecDataQuerySettings& settings, - bool fromCache) - { - TString queryKey = dataQuery.Impl_->GetTextHash(); - auto cb = [queryKey](const TDataQueryResult& result, TSession::TImpl& session) { - if (result.GetStatus() == EStatus::NOT_FOUND) { - session.InvalidateQueryInCache(queryKey); - } - }; - - return InjectSessionStatusInterception<TDataQueryResult>( - session.SessionImpl_, - session.Client_->ExecuteDataQueryInternal(session, dataQuery, txControl, params, settings, fromCache), - true, - GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_), - cb); - } - - TAsyncPrepareQueryResult PrepareDataQuery(const TSession& session, const TString& query, - const TPrepareDataQuerySettings& settings) - { - auto request = MakeOperationRequest<Ydb::Table::PrepareDataQueryRequest>(settings); - request.set_session_id(session.GetId()); - request.set_yql_text(query); - - auto promise = NewPromise<TPrepareQueryResult>(); - - //See ExecuteDataQueryInternal for explanation - auto sessionPtr = new TSession(session); - auto extractor = [promise, sessionPtr, query] - (google::protobuf::Any* any, TPlainStatus status) mutable { - TDataQuery dataQuery(*sessionPtr, query, ""); - - if (any) { - Ydb::Table::PrepareQueryResult result; - any->UnpackTo(&result); - - if (status.Ok()) { - dataQuery = TDataQuery(*sessionPtr, query, result.query_id(), result.parameters_types()); - sessionPtr->SessionImpl_->AddQueryToCache(dataQuery); - } - } - - TPrepareQueryResult prepareQueryResult(TStatus(std::move(status)), - dataQuery, false); - delete sessionPtr; - promise.SetValue(std::move(prepareQueryResult)); - }; - - CollectQuerySize(query, QuerySizeHistogram); - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::PrepareDataQueryRequest, Ydb::Table::PrepareDataQueryResponse>( - std::move(request), - extractor, - &Ydb::Table::V1::TableService::Stub::AsyncPrepareDataQuery, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); - - return promise.GetFuture(); - } - - TAsyncStatus ExecuteSchemeQuery(const TString& sessionId, const TString& query, - const TExecSchemeQuerySettings& settings) - { - auto request = MakeOperationRequest<Ydb::Table::ExecuteSchemeQueryRequest>(settings); - request.set_session_id(sessionId); - request.set_yql_text(query); - - return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::ExecuteSchemeQueryRequest, Ydb::Table::ExecuteSchemeQueryResponse>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncExecuteSchemeQuery, - TRpcRequestSettings::Make(settings)); - } - - TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings, - const TBeginTxSettings& settings) - { - auto request = MakeOperationRequest<Ydb::Table::BeginTransactionRequest>(settings); - request.set_session_id(session.GetId()); - SetTxSettings(txSettings, request.mutable_tx_settings()); - - auto promise = NewPromise<TBeginTransactionResult>(); - - auto extractor = [promise, session] - (google::protobuf::Any* any, TPlainStatus status) mutable { - TString txId; - if (any) { - Ydb::Table::BeginTransactionResult result; - any->UnpackTo(&result); - txId = result.tx_meta().id(); - } - - TBeginTransactionResult beginTxResult(TStatus(std::move(status)), - TTransaction(session, txId)); - promise.SetValue(std::move(beginTxResult)); - }; - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BeginTransactionRequest, Ydb::Table::BeginTransactionResponse>( - std::move(request), - extractor, - &Ydb::Table::V1::TableService::Stub::AsyncBeginTransaction, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); - - return promise.GetFuture(); - } - - TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx, - const TCommitTxSettings& settings) - { - auto request = MakeOperationRequest<Ydb::Table::CommitTransactionRequest>(settings); - request.set_session_id(session.GetId()); - request.set_tx_id(tx.GetId()); - request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); - - auto promise = NewPromise<TCommitTransactionResult>(); - - auto extractor = [promise] - (google::protobuf::Any* any, TPlainStatus status) mutable { - TMaybe<TQueryStats> queryStats; - if (any) { - Ydb::Table::CommitTransactionResult result; - any->UnpackTo(&result); - - if (result.has_query_stats()) { - queryStats = TQueryStats(result.query_stats()); - } - } - - TCommitTransactionResult commitTxResult(TStatus(std::move(status)), queryStats); - promise.SetValue(std::move(commitTxResult)); - }; - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::CommitTransactionRequest, Ydb::Table::CommitTransactionResponse>( - std::move(request), - extractor, - &Ydb::Table::V1::TableService::Stub::AsyncCommitTransaction, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); - - return promise.GetFuture(); - } - - TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx, - const TRollbackTxSettings& settings) - { - auto request = MakeOperationRequest<Ydb::Table::RollbackTransactionRequest>(settings); - request.set_session_id(session.GetId()); - request.set_tx_id(tx.GetId()); - - return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncRollbackTransaction, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); - } - - TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const TString& query, - const TExplainDataQuerySettings& settings) - { - auto request = MakeOperationRequest<Ydb::Table::ExplainDataQueryRequest>(settings); - request.set_session_id(session.GetId()); - request.set_yql_text(query); - - auto promise = NewPromise<TExplainQueryResult>(); - - auto extractor = [promise] - (google::protobuf::Any* any, TPlainStatus status) mutable { - TString ast; - TString plan; - if (any) { - Ydb::Table::ExplainQueryResult result; - any->UnpackTo(&result); - ast = result.query_ast(); - plan = result.query_plan(); - } - TExplainQueryResult val(TStatus(std::move(status)), - std::move(plan), std::move(ast)); - promise.SetValue(std::move(val)); - }; - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::ExplainDataQueryRequest, Ydb::Table::ExplainDataQueryResponse>( - std::move(request), - extractor, - &Ydb::Table::V1::TableService::Stub::AsyncExplainDataQuery, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); - - return promise.GetFuture(); - } - - static void SetTypedValue(Ydb::TypedValue* protoValue, const TValue& value) { - protoValue->mutable_type()->CopyFrom(TProtoAccessor::GetProto(value.GetType())); - protoValue->mutable_value()->CopyFrom(TProtoAccessor::GetProto(value)); - } - - NThreading::TFuture<std::pair<TPlainStatus, TReadTableStreamProcessorPtr>> ReadTable( - const TString& sessionId, - const TString& path, - const TReadTableSettings& settings) - { - auto request = MakeRequest<Ydb::Table::ReadTableRequest>(); - request.set_session_id(sessionId); - request.set_path(path); - request.set_ordered(settings.Ordered_); - if (settings.RowLimit_) { - request.set_row_limit(settings.RowLimit_.GetRef()); - } - for (const auto& col : settings.Columns_) { - request.add_columns(col); - } - if (settings.UseSnapshot_) { - request.set_use_snapshot( - settings.UseSnapshot_.GetRef() - ? Ydb::FeatureFlag::ENABLED - : Ydb::FeatureFlag::DISABLED); - } - - if (settings.From_) { - const auto& from = settings.From_.GetRef(); - if (from.IsInclusive()) { - SetTypedValue(request.mutable_key_range()->mutable_greater_or_equal(), from.GetValue()); - } else { - SetTypedValue(request.mutable_key_range()->mutable_greater(), from.GetValue()); - } - } - - if (settings.To_) { - const auto& to = settings.To_.GetRef(); - if (to.IsInclusive()) { - SetTypedValue(request.mutable_key_range()->mutable_less_or_equal(), to.GetValue()); - } else { - SetTypedValue(request.mutable_key_range()->mutable_less(), to.GetValue()); - } - } - - auto promise = NewPromise<std::pair<TPlainStatus, TReadTableStreamProcessorPtr>>(); - - Connections_->StartReadStream<Ydb::Table::V1::TableService, Ydb::Table::ReadTableRequest, Ydb::Table::ReadTableResponse>( - std::move(request), - [promise] (TPlainStatus status, TReadTableStreamProcessorPtr processor) mutable { - promise.SetValue(std::make_pair(status, processor)); - }, - &Ydb::Table::V1::TableService::Stub::AsyncStreamReadTable, - DbDriverState_, - TRpcRequestSettings::Make(settings)); - - return promise.GetFuture(); - - } - - TAsyncReadRowsResult ReadRows(const TString& path, TValue&& keys, const TReadRowsSettings& settings) { - auto request = MakeRequest<Ydb::Table::ReadRowsRequest>(); - request.set_path(path); - auto* protoKeys = request.mutable_keys(); - *protoKeys->mutable_type() = TProtoAccessor::GetProto(keys.GetType()); - *protoKeys->mutable_value() = TProtoAccessor::GetProto(keys); - - auto promise = NewPromise<TReadRowsResult>(); - - auto responseCb = [promise] - (Ydb::Table::ReadRowsResponse* response, TPlainStatus status) mutable { - Y_VERIFY(response); - TResultSet resultSet = TResultSet(response->result_set()); - TReadRowsResult val(TStatus(std::move(status)), std::move(resultSet)); - promise.SetValue(std::move(val)); - }; - - Connections_->Run<Ydb::Table::V1::TableService, Ydb::Table::ReadRowsRequest, Ydb::Table::ReadRowsResponse>( - std::move(request), - responseCb, - &Ydb::Table::V1::TableService::Stub::AsyncReadRows, - DbDriverState_, - TRpcRequestSettings::Make(settings), // requestSettings - TEndpointKey() // preferredEndpoint - ); - - return promise.GetFuture(); - } - - TAsyncStatus Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings) { - auto request = MakeOperationRequest<Ydb::Table::DeleteSessionRequest>(settings); - request.set_session_id(sessionImpl->GetId()); - return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse>( - std::move(request), - &Ydb::Table::V1::TableService::Stub::AsyncDeleteSession, - TRpcRequestSettings::Make(settings), - sessionImpl->GetEndpointKey()); - } - - TAsyncStatus CloseInternal(const TSession::TImpl* sessionImpl) { - static const auto internalCloseSessionSettings = TCloseSessionSettings() - .ClientTimeout(TDuration::Seconds(2)); - - auto driver = Connections_; - return Close(sessionImpl, internalCloseSessionSettings) - .Apply([driver{std::move(driver)}](TAsyncStatus status) mutable - { - driver.reset(); - return status; - }); - } - - bool ReturnSession(TSession::TImpl* sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE || - sessionImpl->GetState() == TSession::TImpl::S_IDLE); - - if (RequestMigrator_.DoCheckAndMigrate(sessionImpl)) { - SessionRemovedDueBalancing.Inc(); - return false; - } - - bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter(); - // Also removes NeedUpdateActiveCounter flag - sessionImpl->MarkIdle(); - sessionImpl->SetTimeInterval(TDuration::Zero()); - if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { - sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); - return false; - } - return true; - } - - void DeleteSession(TSession::TImpl* sessionImpl) { - if (sessionImpl->NeedUpdateActiveCounter()) { - SessionPool_.DecrementActiveCounter(); - } - - if (sessionImpl->GetId()) { - CloseInternal(sessionImpl); - DbDriverState_->StatCollector.DecSessionsOnHost(sessionImpl->GetEndpoint()); - } - - delete sessionImpl; - } - - ui32 GetSessionRetryLimit() const { - return Settings_.SessionPoolSettings_.RetryLimit_; - } - - void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector) { - CacheMissCounter.Set(collector.CacheMiss); - QuerySizeHistogram.Set(collector.QuerySize); - ParamsSizeHistogram.Set(collector.ParamsSize); - RetryOperationStatCollector = collector.RetryOperationStatCollector; - SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing); - RequestMigrated.Set(collector.RequestMigrated); - } - - TAsyncBulkUpsertResult BulkUpsert(const TString& table, TValue&& rows, const TBulkUpsertSettings& settings) { - auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings); - request.set_table(table); - *request.mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType()); - *request.mutable_rows()->mutable_value() = std::move(rows.GetProto()); - - auto promise = NewPromise<TBulkUpsertResult>(); - - auto extractor = [promise] - (google::protobuf::Any* any, TPlainStatus status) mutable { - Y_UNUSED(any); - TBulkUpsertResult val(TStatus(std::move(status))); - promise.SetValue(std::move(val)); - }; - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>( - std::move(request), - extractor, - &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings)); - - return promise.GetFuture(); - } - - TAsyncBulkUpsertResult BulkUpsert(const TString& table, EDataFormat format, - const TString& data, const TString& schema, const TBulkUpsertSettings& settings) - { - auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings); - request.set_table(table); - if (format == EDataFormat::ApacheArrow) { - request.mutable_arrow_batch_settings()->set_schema(schema); - } else if (format == EDataFormat::CSV) { - auto* csv_settings = request.mutable_csv_settings(); - const auto& format_settings = settings.FormatSettings_; - if (!format_settings.empty()) { - bool ok = csv_settings->ParseFromString(format_settings); - if (!ok) { - return {}; - } - } - } - request.set_data(data); - - auto promise = NewPromise<TBulkUpsertResult>(); - - auto extractor = [promise] - (google::protobuf::Any* any, TPlainStatus status) mutable { - Y_UNUSED(any); - TBulkUpsertResult val(TStatus(std::move(status))); - promise.SetValue(std::move(val)); - }; - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>( - std::move(request), - extractor, - &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings)); - - return promise.GetFuture(); - } - - TFuture<std::pair<TPlainStatus, TScanQueryProcessorPtr>> StreamExecuteScanQueryInternal(const TString& query, - const ::google::protobuf::Map<TString, Ydb::TypedValue>* params, - const TStreamExecScanQuerySettings& settings) - { - auto request = MakeRequest<Ydb::Table::ExecuteScanQueryRequest>(); - request.mutable_query()->set_yql_text(query); - - if (params) { - *request.mutable_parameters() = *params; - } - - if (settings.Explain_) { - request.set_mode(Ydb::Table::ExecuteScanQueryRequest::MODE_EXPLAIN); - } else { - request.set_mode(Ydb::Table::ExecuteScanQueryRequest::MODE_EXEC); - } - - request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); - - auto promise = NewPromise<std::pair<TPlainStatus, TScanQueryProcessorPtr>>(); - - Connections_->StartReadStream< - Ydb::Table::V1::TableService, - Ydb::Table::ExecuteScanQueryRequest, - Ydb::Table::ExecuteScanQueryPartialResponse> - ( - std::move(request), - [promise] (TPlainStatus status, TScanQueryProcessorPtr processor) mutable { - promise.SetValue(std::make_pair(status, processor)); - }, - &Ydb::Table::V1::TableService::Stub::AsyncStreamExecuteScanQuery, - DbDriverState_, - TRpcRequestSettings::Make(settings) - ); - - return promise.GetFuture(); - } - - TAsyncScanQueryPartIterator StreamExecuteScanQuery(const TString& query, - const ::google::protobuf::Map<TString, Ydb::TypedValue>* params, - const TStreamExecScanQuerySettings& settings) - { - auto promise = NewPromise<TScanQueryPartIterator>(); - - auto iteratorCallback = [promise](TFuture<std::pair<TPlainStatus, - TTableClient::TImpl::TScanQueryProcessorPtr>> future) mutable - { - Y_ASSERT(future.HasValue()); - auto pair = future.ExtractValue(); - promise.SetValue(TScanQueryPartIterator( - pair.second - ? std::make_shared<TScanQueryPartIterator::TReaderImpl>(pair.second, pair.first.Endpoint) - : nullptr, - std::move(pair.first)) - ); - }; - - StreamExecuteScanQueryInternal(query, params, settings).Subscribe(iteratorCallback); - return promise.GetFuture(); - } - - static void CloseAndDeleteSession( - std::unique_ptr<TSession::TImpl>&& impl, - std::shared_ptr<TTableClient::TImpl> client); -public: - TClientSettings Settings_; - -private: - static void SetParams( - ::google::protobuf::Map<TString, Ydb::TypedValue>* params, - Ydb::Table::ExecuteDataQueryRequest* request) - { - if (params) { - request->mutable_parameters()->swap(*params); - } - } - - static void SetParams( - const ::google::protobuf::Map<TString, Ydb::TypedValue>& params, - Ydb::Table::ExecuteDataQueryRequest* request) - { - *request->mutable_parameters() = params; - } - - static void CollectParams( - ::google::protobuf::Map<TString, Ydb::TypedValue>* params, - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram) - { - - if (params && histgoram.IsCollecting()) { - size_t size = 0; - for (auto& keyvalue: *params) { - size += keyvalue.second.ByteSizeLong(); - } - histgoram.Record(size); - } - } - - static void CollectParams( - const ::google::protobuf::Map<TString, Ydb::TypedValue>& params, - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram) - { - - if (histgoram.IsCollecting()) { - size_t size = 0; - for (auto& keyvalue: params) { - size += keyvalue.second.ByteSizeLong(); - } - histgoram.Record(size); - } - } - - static void CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>& querySizeHistogram) { - if (querySizeHistogram.IsCollecting()) { - querySizeHistogram.Record(query.size()); - } - } - - static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&) {} - - template <typename TQueryType, typename TParamsType> - TAsyncDataQueryResult ExecuteDataQueryInternal(const TSession& session, const TQueryType& query, - const TTxControl& txControl, TParamsType params, - const TExecDataQuerySettings& settings, bool fromCache) - { - auto request = MakeOperationRequest<Ydb::Table::ExecuteDataQueryRequest>(settings); - request.set_session_id(session.GetId()); - auto txControlProto = request.mutable_tx_control(); - txControlProto->set_commit_tx(txControl.CommitTx_); - if (txControl.TxId_) { - txControlProto->set_tx_id(*txControl.TxId_); - } else { - SetTxSettings(txControl.BeginTx_, txControlProto->mutable_begin_tx()); - } - - request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); - - SetQuery(query, request.mutable_query()); - CollectQuerySize(query, QuerySizeHistogram); - - SetParams(params, &request); - CollectParams(params, ParamsSizeHistogram); - - SetQueryCachePolicy(query, settings, request.mutable_query_cache_policy()); - - auto promise = NewPromise<TDataQueryResult>(); - bool keepInCache = settings.KeepInQueryCache_ && settings.KeepInQueryCache_.GetRef(); - - // We don't want to delay call of TSession dtor, so we can't capture it by copy - // otherwise we break session pool and other clients logic. - // Same problem with TDataQuery and TTransaction - // - // The fast solution is: - // - create copy of TSession out of lambda - // - capture pointer - // - call free just before SetValue call - auto sessionPtr = new TSession(session); - auto extractor = [promise, sessionPtr, query, fromCache, keepInCache] - (google::protobuf::Any* any, TPlainStatus status) mutable { - TVector<TResultSet> res; - TMaybe<TTransaction> tx; - TMaybe<TDataQuery> dataQuery; - TMaybe<TQueryStats> queryStats; - - auto queryText = GetQueryText(query); - if (any) { - Ydb::Table::ExecuteQueryResult result; - any->UnpackTo(&result); - - for (size_t i = 0; i < result.result_setsSize(); i++) { - res.push_back(TResultSet(*result.mutable_result_sets(i))); - } - - if (result.has_tx_meta()) { - tx = TTransaction(*sessionPtr, result.tx_meta().id()); - } - - if (result.has_query_meta()) { - if (queryText) { - auto& query_meta = result.query_meta(); - dataQuery = TDataQuery(*sessionPtr, *queryText, query_meta.id(), query_meta.parameters_types()); - } - } - - if (result.has_query_stats()) { - queryStats = TQueryStats(result.query_stats()); - } - } - - if (keepInCache && dataQuery && queryText) { - sessionPtr->SessionImpl_->AddQueryToCache(*dataQuery); - } - - TDataQueryResult dataQueryResult(TStatus(std::move(status)), - std::move(res), tx, dataQuery, fromCache, queryStats); - - delete sessionPtr; - tx.Clear(); - dataQuery.Clear(); - promise.SetValue(std::move(dataQueryResult)); - }; - - Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse>( - std::move(request), - extractor, - &Ydb::Table::V1::TableService::Stub::AsyncExecuteDataQuery, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); - - return promise.GetFuture(); - } - - static void SetTxSettings(const TTxSettings& txSettings, Ydb::Table::TransactionSettings* proto) - { - switch (txSettings.Mode_) { - case TTxSettings::TS_SERIALIZABLE_RW: - proto->mutable_serializable_read_write(); - break; - case TTxSettings::TS_ONLINE_RO: - proto->mutable_online_read_only()->set_allow_inconsistent_reads( - txSettings.OnlineSettings_.AllowInconsistentReads_); - break; - case TTxSettings::TS_STALE_RO: - proto->mutable_stale_read_only(); - break; - case TTxSettings::TS_SNAPSHOT_RO: - proto->mutable_snapshot_read_only(); - break; - default: - throw TContractViolation("Unexpected transaction mode."); - } - } - - static void SetQuery(const TString& queryText, Ydb::Table::Query* query) { - query->set_yql_text(queryText); - } - - static void SetQuery(const TDataQuery& queryData, Ydb::Table::Query* query) { - query->set_id(queryData.GetId()); - } - - static void SetQueryCachePolicy(const TString&, const TExecDataQuerySettings& settings, - Ydb::Table::QueryCachePolicy* queryCachePolicy) - { - queryCachePolicy->set_keep_in_cache(settings.KeepInQueryCache_ ? settings.KeepInQueryCache_.GetRef() : false); - } - - static void SetQueryCachePolicy(const TDataQuery&, const TExecDataQuerySettings& settings, - Ydb::Table::QueryCachePolicy* queryCachePolicy) { - queryCachePolicy->set_keep_in_cache(settings.KeepInQueryCache_ ? settings.KeepInQueryCache_.GetRef() : true); - } - - static TMaybe<TString> GetQueryText(const TString& queryText) { - return queryText; - } - - static TMaybe<TString> GetQueryText(const TDataQuery& queryData) { - return queryData.GetText(); - } - -public: - NSdkStats::TAtomicCounter<::NMonitoring::TRate> CacheMissCounter; - NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector; - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram; - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram; - NSdkStats::TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing; - NSdkStats::TAtomicCounter<::NMonitoring::TRate> RequestMigrated; - -private: - TSessionPoolImpl SessionPool_; - TRequestMigrator RequestMigrator_; - static const TKeepAliveSettings KeepAliveSettings; -}; - -const TKeepAliveSettings TTableClient::TImpl::KeepAliveSettings = TKeepAliveSettings().ClientTimeout(KEEP_ALIVE_CLIENT_TIMEOUT); - -TSessionPoolImpl::TSessionPoolImpl(ui32 maxActiveSessions) - : Closed_(false) - , ActiveSessions_(0) - , MaxActiveSessions_(maxActiveSessions) -{} - -void TTableClient::TImpl::CloseAndDeleteSession(std::unique_ptr<TSession::TImpl>&& impl, - std::shared_ptr<TTableClient::TImpl> client) { - std::shared_ptr<TSession::TImpl> deleteSession( - impl.release(), - TSession::TImpl::GetSmartDeleter(client)); - - deleteSession->MarkBroken(); -} - -void TSessionPoolImpl::CreateFakeSession( - NThreading::TPromise<TCreateSessionResult>& promise, - std::shared_ptr<TTableClient::TImpl> client) -{ - TSession session(client, "", ""); - // Mark broken to prevent returning to session pool - session.SessionImpl_->MarkBroken(); - TCreateSessionResult val( - TStatus( - TPlainStatus( - EStatus::CLIENT_RESOURCE_EXHAUSTED, - "Active sessions limit exceeded" - ) - ), - std::move(session) - ); - - client->ScheduleTaskUnsafe([promise, val{std::move(val)}]() mutable { - promise.SetValue(std::move(val)); - }, TDuration()); -} - -TAsyncCreateSessionResult TSessionPoolImpl::GetSession( - std::shared_ptr<TTableClient::TImpl> client, - const TCreateSessionSettings& settings) -{ - auto createSessionPromise = NewPromise<TCreateSessionResult>(); - std::unique_ptr<TSession::TImpl> sessionImpl; - bool needUpdateActiveSessionCounter = false; - bool returnFakeSession = false; - { - std::lock_guard guard(Mtx_); - if (MaxActiveSessions_) { - if (ActiveSessions_ < MaxActiveSessions_) { - ActiveSessions_++; - needUpdateActiveSessionCounter = true; - } else { - returnFakeSession = true; - } - } else { - ActiveSessions_++; - needUpdateActiveSessionCounter = true; - } - if (!Sessions_.empty()) { - auto it = std::prev(Sessions_.end()); - sessionImpl = std::move(it->second); - Sessions_.erase(it); - } - UpdateStats(); - } - if (returnFakeSession) { - FakeSessionsCounter_.Inc(); - CreateFakeSession(createSessionPromise, client); - return createSessionPromise.GetFuture(); - } else if (sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); - Y_VERIFY(!sessionImpl->GetTimeInterval()); - Y_VERIFY(needUpdateActiveSessionCounter); - sessionImpl->MarkActive(); - sessionImpl->SetNeedUpdateActiveCounter(true); - - TCreateSessionResult val(TStatus(TPlainStatus()), - TSession(client, std::shared_ptr<TSession::TImpl>( - sessionImpl.release(), - TSession::TImpl::GetSmartDeleter(client)))); - - client->ScheduleTaskUnsafe([createSessionPromise, val{std::move(val)}]() mutable { - createSessionPromise.SetValue(std::move(val)); - }, TDuration()); - - return createSessionPromise.GetFuture(); - } else { - const auto& sessionResult = client->CreateSession(settings, false); - sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, needUpdateActiveSessionCounter)); - return createSessionPromise.GetFuture(); - } -} - -bool TSessionPoolImpl::DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, ui64 nodeId) { - std::unique_ptr<TSession::TImpl> sessionImpl; - { - std::lock_guard guard(Mtx_); - for (auto it = Sessions_.begin(); it != Sessions_.end(); it++) { - if (it->second->GetEndpointKey().GetNodeId() == nodeId) { - sessionImpl = std::move(it->second); - Sessions_.erase(it); - break; - } - } - } - if (!sessionImpl) - return false; - - auto deleteFn = TSession::TImpl::GetSmartDeleter(client); - deleteFn(sessionImpl.release()); - return true; -} - -bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active) { - { - std::lock_guard guard(Mtx_); - if (Closed_) - return false; - Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl)); - if (active) { - Y_VERIFY(ActiveSessions_); - ActiveSessions_--; - impl->SetNeedUpdateActiveCounter(false); - } - UpdateStats(); - } - return true; -} - -void TSessionPoolImpl::DecrementActiveCounter() { - std::lock_guard guard(Mtx_); - Y_VERIFY(ActiveSessions_); - ActiveSessions_--; - UpdateStats(); -} - -void TSessionPoolImpl::Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close) { - std::lock_guard guard(Mtx_); - Closed_ = close; - for (auto it = Sessions_.begin(); it != Sessions_.end();) { - const bool cont = cb(std::move(it->second)); - it = Sessions_.erase(it); - if (!cont) - break; - } - UpdateStats(); -} - -TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, - TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate) -{ - auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) { - if (status != EStatus::SUCCESS) { - return false; - } - - auto strongClient = weakClient.lock(); - if (!strongClient) { - // No more clients alive - no need to run periodic, - // moreover it is unsafe to touch this ptr! - return false; - } else { - auto keepAliveBatchSize = PERIODIC_ACTION_BATCH_SIZE; - TVector<std::unique_ptr<TSession::TImpl>> sessionsToTouch; - sessionsToTouch.reserve(keepAliveBatchSize); - TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete; - sessionsToDelete.reserve(keepAliveBatchSize); - auto now = TInstant::Now(); - { - std::lock_guard guard(Mtx_); - auto& sessions = Sessions_; - - auto it = sessions.begin(); - while (it != sessions.end() && keepAliveBatchSize--) { - if (now < it->second->GetTimeToTouchFast()) - break; - - if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) { - sessionsToDelete.emplace_back(std::move(it->second)); - } else { - sessionsToTouch.emplace_back(std::move(it->second)); - } - sessions.erase(it++); - } - UpdateStats(); - } - - for (auto& sessionImpl : sessionsToTouch) { - if (sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); - TSession session(strongClient, std::shared_ptr<TSession::TImpl>( - sessionImpl.release(), - TSession::TImpl::GetSmartDeleter(strongClient))); - cmd(session); - } - } - - for (auto& sessionImpl : sessionsToDelete) { - if (sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE); - TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient); - } - } - } - - return true; - }; - return periodicCb; -} - -i64 TSessionPoolImpl::GetActiveSessions() const { - std::lock_guard guard(Mtx_); - return ActiveSessions_; -} - -i64 TSessionPoolImpl::GetActiveSessionsLimit() const { - return MaxActiveSessions_; -} - -i64 TSessionPoolImpl::GetCurrentPoolSize() const { - std::lock_guard guard(Mtx_); - return Sessions_.size(); -} static bool IsSessionStatusRetriable(const TCreateSessionResult& res) { switch (res.GetStatus()) { @@ -3007,17 +1295,6 @@ TSessionInspectorFn TSession::TImpl::GetSessionInspector( }; } -void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) { - ActiveSessionsCounter_.Set(statCollector.ActiveSessions); - InPoolSessionsCounter_.Set(statCollector.InPoolSessions); - FakeSessionsCounter_.Set(statCollector.FakeSessions); -} - -void TSessionPoolImpl::UpdateStats() { - ActiveSessionsCounter_.Apply(ActiveSessions_); - InPoolSessionsCounter_.Apply(Sessions_.size()); -} - TTableClient::TTableClient(const TDriver& driver, const TClientSettings& settings) : Impl_(new TImpl(CreateInternalInterface(driver), settings)) { Impl_->StartPeriodicSessionPoolTask(); @@ -3504,13 +1781,14 @@ static void ConvertCreateTableSettingsToProto(const TCreateTableSettings& settin //////////////////////////////////////////////////////////////////////////////// -TSession::TSession(std::shared_ptr<TTableClient::TImpl> client, const TString& sessionId, const TString& endpointId) +TSession::TSession(std::shared_ptr<TTableClient::TImpl> client, const TString& sessionId, const TString& endpointId, bool isOwnedBySessionPool) : Client_(client) , SessionImpl_(new TSession::TImpl( sessionId, endpointId, client->Settings_.UseQueryCache_, - client->Settings_.QueryCacheSize_), + client->Settings_.QueryCacheSize_, + isOwnedBySessionPool), TSession::TImpl::GetSmartDeleter(client)) { if (endpointId) { diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index b402fa1e15..d44500349e 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -849,7 +849,6 @@ class TDescribeTableResult; class TBeginTransactionResult; class TCommitTransactionResult; class TKeepAliveResult; -class TSessionPoolImpl; class TBulkUpsertResult; class TReadRowsResult; class TScanQueryPartIterator; @@ -978,6 +977,7 @@ struct TStreamExecScanQuerySettings : public TRequestSettings<TStreamExecScanQue }; class TSession; +class TSessionPool; struct TRetryState; enum class EDataFormat { @@ -988,7 +988,7 @@ enum class EDataFormat { class TTableClient { friend class TSession; friend class TTransaction; - friend class TSessionPoolImpl; + friend class TSessionPool; friend class TRetryOperationContext; public: @@ -1579,7 +1579,7 @@ class TSession { friend class TTableClient; friend class TDataQuery; friend class TTransaction; - friend class TSessionPoolImpl; + friend class TSessionPool; public: //! The following methods perform corresponding calls. @@ -1647,7 +1647,7 @@ public: class TImpl; private: - TSession(std::shared_ptr<TTableClient::TImpl> client, const TString& sessionId, const TString& endpointId); + TSession(std::shared_ptr<TTableClient::TImpl> client, const TString& sessionId, const TString& endpointId, bool isOwnedBySessionPool); TSession(std::shared_ptr<TTableClient::TImpl> client, std::shared_ptr<TSession::TImpl> SessionImpl_); std::shared_ptr<TTableClient::TImpl> Client_; diff --git a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt index 76c589603f..2077daf05e 100644 --- a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt @@ -9,6 +9,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(index_ut) add_subdirectory(sdk_credprovider_ut) +add_subdirectory(sdk_sessions_pool_ut) add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) diff --git a/ydb/services/ydb/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/CMakeLists.linux-aarch64.txt index e481961748..3ab9453018 100644 --- a/ydb/services/ydb/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/CMakeLists.linux-aarch64.txt @@ -9,6 +9,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(index_ut) add_subdirectory(sdk_credprovider_ut) +add_subdirectory(sdk_sessions_pool_ut) add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) diff --git a/ydb/services/ydb/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/CMakeLists.linux-x86_64.txt index e481961748..3ab9453018 100644 --- a/ydb/services/ydb/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ydb/CMakeLists.linux-x86_64.txt @@ -9,6 +9,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(index_ut) add_subdirectory(sdk_credprovider_ut) +add_subdirectory(sdk_sessions_pool_ut) add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) diff --git a/ydb/services/ydb/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/CMakeLists.windows-x86_64.txt index 76c589603f..2077daf05e 100644 --- a/ydb/services/ydb/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ydb/CMakeLists.windows-x86_64.txt @@ -9,6 +9,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(index_ut) add_subdirectory(sdk_credprovider_ut) +add_subdirectory(sdk_sessions_pool_ut) add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..76c1e3815f --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,81 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ydb-sdk_sessions_pool_ut) +target_compile_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb +) +target_link_libraries(ydb-services-ydb-sdk_sessions_pool_ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-ydb + cpp-grpc-client + core-testlib-default + ydb-core-testlib + cpp-client-ydb_table +) +target_link_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp +) +set_property( + TARGET + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-ydb-sdk_sessions_pool_ut + TEST_TARGET + ydb-services-ydb-sdk_sessions_pool_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + TIMEOUT + 300 +) +target_allocator(ydb-services-ydb-sdk_sessions_pool_ut + system_allocator +) +vcs_info(ydb-services-ydb-sdk_sessions_pool_ut) diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..307872510d --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,84 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ydb-sdk_sessions_pool_ut) +target_compile_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb +) +target_link_libraries(ydb-services-ydb-sdk_sessions_pool_ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + ydb-services-ydb + cpp-grpc-client + core-testlib-default + ydb-core-testlib + cpp-client-ydb_table +) +target_link_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp +) +set_property( + TARGET + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-ydb-sdk_sessions_pool_ut + TEST_TARGET + ydb-services-ydb-sdk_sessions_pool_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + TIMEOUT + 300 +) +target_allocator(ydb-services-ydb-sdk_sessions_pool_ut + cpp-malloc-jemalloc +) +vcs_info(ydb-services-ydb-sdk_sessions_pool_ut) diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..d3a3ab6950 --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-x86_64.txt @@ -0,0 +1,86 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ydb-sdk_sessions_pool_ut) +target_compile_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb +) +target_link_libraries(ydb-services-ydb-sdk_sessions_pool_ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-ydb + cpp-grpc-client + core-testlib-default + ydb-core-testlib + cpp-client-ydb_table +) +target_link_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp +) +set_property( + TARGET + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-ydb-sdk_sessions_pool_ut + TEST_TARGET + ydb-services-ydb-sdk_sessions_pool_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + TIMEOUT + 300 +) +target_allocator(ydb-services-ydb-sdk_sessions_pool_ut + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(ydb-services-ydb-sdk_sessions_pool_ut) diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..60cb26ee02 --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.windows-x86_64.txt @@ -0,0 +1,74 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ydb-sdk_sessions_pool_ut) +target_compile_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb +) +target_link_libraries(ydb-services-ydb-sdk_sessions_pool_ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-ydb + cpp-grpc-client + core-testlib-default + ydb-core-testlib + cpp-client-ydb_table +) +target_sources(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp +) +set_property( + TARGET + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-ydb-sdk_sessions_pool_ut + TEST_TARGET + ydb-services-ydb-sdk_sessions_pool_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_pool_ut + PROPERTY + TIMEOUT + 300 +) +target_allocator(ydb-services-ydb-sdk_sessions_pool_ut + system_allocator +) +vcs_info(ydb-services-ydb-sdk_sessions_pool_ut) diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp b/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp new file mode 100644 index 0000000000..97ae2d6aa3 --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp @@ -0,0 +1,448 @@ +#include "ydb_common_ut.h" + +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> + +#include <util/thread/pool.h> + +#include <random> +#include <thread> + +using namespace NYdb; +using namespace NYdb::NTable; + +class TDefaultTestSetup { +public: + TDefaultTestSetup(ui32 maxActiveSessions) + : Driver_(NYdb::TDriver( + TDriverConfig().SetEndpoint( + TStringBuilder() << "localhost:" << Server_.GetPort() + ) + )) + , Client_( + Driver_, + TClientSettings().SessionPoolSettings( + TSessionPoolSettings() + .MaxActiveSessions(maxActiveSessions) + .KeepAliveIdleThreshold(TDuration::MilliSeconds(10)) + .CloseIdleThreshold(TDuration::MilliSeconds(10)) + ) + ) + { + } + + ~TDefaultTestSetup() { + Driver_.Stop(true); + } + + NYdb::NTable::TTableClient& GetClient() { + return Client_; + } + +private: + TKikimrWithGrpcAndRootSchema Server_; + NYdb::TDriver Driver_; + NYdb::NTable::TTableClient Client_; +}; + + +enum class EAction: ui8 { + CreateFuture, + ExtractValue, + Return +}; +using TPlan = TVector<std::pair<EAction, ui32>>; + + +void CheckPlan(TPlan plan) { + THashMap<ui32, EAction> sessions; + for (const auto& [action, sessionId]: plan) { + if (action == EAction::CreateFuture) { + UNIT_ASSERT(!sessions.contains(sessionId)); + } else { + UNIT_ASSERT(sessions.contains(sessionId)); + switch (sessions.at(sessionId)) { + case EAction::CreateFuture: { + UNIT_ASSERT(action == EAction::ExtractValue); + break; + } + case EAction::ExtractValue: { + UNIT_ASSERT(action == EAction::Return); + break; + } + default: { + UNIT_ASSERT(false); + } + } + } + sessions[sessionId] = action; + } +} + +void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) { + THashMap<ui32, NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; + THashMap<ui32, NYdb::NTable::TCreateSessionResult> sessions; + + ui32 requestedSessions = 0; + + for (const auto& [action, sessionId]: plan) { + switch (action) { + case EAction::CreateFuture: { + sessionFutures.emplace(sessionId, client.GetSession()); + ++requestedSessions; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (requestedSessions > client.GetActiveSessionsLimit()) { + UNIT_ASSERT(client.GetActiveSessionCount() == client.GetActiveSessionsLimit()); + } + UNIT_ASSERT(!sessionFutures.at(sessionId).HasValue()); + break; + } + case EAction::ExtractValue: { + auto it = sessionFutures.find(sessionId); + auto session = it->second.ExtractValueSync(); + sessionFutures.erase(it); + sessions.emplace(sessionId, std::move(session)); + break; + } + case EAction::Return: { + sessions.erase(sessionId); + --requestedSessions; + break; + } + } + UNIT_ASSERT(client.GetActiveSessionCount() <= client.GetActiveSessionsLimit()); + UNIT_ASSERT(client.GetActiveSessionCount() >= static_cast<i64>(sessions.size())); + UNIT_ASSERT(client.GetActiveSessionCount() <= static_cast<i64>(sessions.size() + sessionFutures.size())); + } +} + +int GetRand(std::mt19937& rng, int min, int max) { + std::uniform_int_distribution<std::mt19937::result_type> dist(min, max); + return dist(rng); +} + + +TPlan GenerateRandomPlan(ui32 numSessions) { + TPlan plan; + std::random_device dev; + std::mt19937 rng(dev()); + + for (ui32 i = 0; i < numSessions; ++i) { + std::uniform_int_distribution<std::mt19937::result_type> dist(0, plan.size()); + ui32 prevPos = 0; + for (EAction action: {EAction::CreateFuture, EAction::ExtractValue, EAction::Return}) { + int pos = GetRand(rng, prevPos, plan.size()); + plan.emplace(plan.begin() + pos, std::make_pair(action, i)); + prevPos = pos + 1; + } + } + return plan; +} + + +Y_UNIT_TEST_SUITE(YdbSdkSessionsPool) { + Y_UNIT_TEST(Get1Session) { + TDefaultTestSetup setup(1); + auto& client = setup.GetClient(); + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionsLimit(), 1); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 0); + + { + auto session = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 0); + } + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 1); + } + + void TestWaitQueue(NYdb::NTable::TTableClient& client, ui32 activeSessionsLimit) { + std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; + std::vector<NYdb::NTable::TCreateSessionResult> sessions; + + // exhaust the pool + for (ui32 i = 0; i < activeSessionsLimit; ++i) { + sessions.emplace_back(client.GetSession().ExtractValueSync()); + } + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit); + + // next should be in the wait queue + for (ui32 i = 0; i < activeSessionsLimit * 10; ++i) { + sessionFutures.emplace_back(client.GetSession()); + } + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit); + + // next should be a fake session + { + auto brokenSession = client.GetSession().ExtractValueSync(); + UNIT_ASSERT(!brokenSession.IsSuccess()); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + for (auto& sessionFuture: sessionFutures) { + UNIT_ASSERT(!sessionFuture.HasValue()); + } + + for (auto& sessionFuture: sessionFutures) { + sessions.erase(sessions.begin()); + sessions.emplace_back(sessionFuture.ExtractValueSync()); + } + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit); + } + + Y_UNIT_TEST(WaitQueue1) { + ui32 activeSessionsLimit = 1; + + TDefaultTestSetup setup(activeSessionsLimit); + auto& client = setup.GetClient(); + + TestWaitQueue(client, activeSessionsLimit); + } + + Y_UNIT_TEST(WaitQueue10) { + ui32 activeSessionsLimit = 10; + + TDefaultTestSetup setup(activeSessionsLimit); + auto& client = setup.GetClient(); + + TestWaitQueue(client, activeSessionsLimit); + } + + Y_UNIT_TEST(RunSmallPlan) { + TDefaultTestSetup setup(1); + auto& client = setup.GetClient(); + + TPlan plan{ + {EAction::CreateFuture, 1}, + {EAction::ExtractValue, 1}, + {EAction::CreateFuture, 2}, + {EAction::Return, 1}, + {EAction::ExtractValue, 2}, + {EAction::Return, 2} + }; + CheckPlan(plan); + RunPlan(plan, client); + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 1); + } + + Y_UNIT_TEST(CustomPlan) { + TDefaultTestSetup setup(1); + auto& client = setup.GetClient(); + + TPlan plan{ + {EAction::CreateFuture, 1} + }; + CheckPlan(plan); + RunPlan(plan, client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + } + + ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) { + std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; + std::vector<NYdb::NTable::TCreateSessionResult> sessions; + std::mt19937 rng(0); + ui32 successCount = 0; + + for (ui32 i = 0; i < activeSessionsLimit * 12; ++i) { + sessionFutures.emplace_back(client.GetSession()); + } + + for (ui32 i = 0; i < n; ++i) { + switch (static_cast<EAction>(GetRand(rng, 0, 2))) { + case EAction::CreateFuture: { + sessionFutures.emplace_back(client.GetSession()); + break; + } + case EAction::ExtractValue: { + if (sessionFutures.empty()) { + break; + } + auto ind = GetRand(rng, 0, sessionFutures.size() - 1); + auto sessionFuture = sessionFutures[ind]; + if (sessionFuture.HasValue()) { + auto session = sessionFuture.ExtractValueSync(); + if (session.IsSuccess()) { + ++successCount; + } + sessions.emplace_back(std::move(session)); + sessionFutures.erase(sessionFutures.begin() + ind); + break; + } + break; + } + case EAction::Return: { + if (sessions.empty()) { + break; + } + auto ind = GetRand(rng, 0, sessions.size() - 1); + sessions.erase(sessions.begin() + ind); + break; + } + } + } + return successCount; + } + + Y_UNIT_TEST(StressTestSync1) { + ui32 activeSessionsLimit = 1; + + TDefaultTestSetup setup(activeSessionsLimit); + auto& client = setup.GetClient(); + + RunStressTestSync(1000, activeSessionsLimit, client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); + } + + Y_UNIT_TEST(StressTestSync10) { + ui32 activeSessionsLimit = 10; + + TDefaultTestSetup setup(activeSessionsLimit); + auto& client = setup.GetClient(); + + RunStressTestSync(1000, activeSessionsLimit, client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); + } + + ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& client) { + std::atomic<ui32> successCount(0); + std::atomic<ui32> jobIndex(0); + + auto job = [&client, &successCount, &jobIndex, n]() mutable { + std::mt19937 rng(++jobIndex); + for (ui32 i = 0; i < n; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100))); + auto sessionFuture = client.GetSession(); + std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100))); + auto session = sessionFuture.ExtractValueSync(); + std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100))); + successCount += session.IsSuccess(); + } + }; + + IThreadFactory* pool = SystemThreadFactory(); + TVector<TAutoPtr<IThreadFactory::IThread>> threads; + threads.resize(nThreads); + for (ui32 i = 0; i < nThreads; i++) { + threads[i] = pool->Run(job); + } + for (ui32 i = 0; i < nThreads; i++) { + threads[i]->Join(); + } + + return successCount; + } + + Y_UNIT_TEST(StressTestAsync1) { + ui32 activeSessionsLimit = 1; + + TDefaultTestSetup setup(activeSessionsLimit); + auto& client = setup.GetClient(); + + RunStressTestAsync(100, 10, client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); + } + + Y_UNIT_TEST(StressTestAsync10) { + ui32 activeSessionsLimit = 10; + + TDefaultTestSetup setup(activeSessionsLimit); + auto& client = setup.GetClient(); + + RunStressTestAsync(1000, 10, client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); + } + + void TestPeriodicTask(ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) { + std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; + std::vector<NYdb::NTable::TCreateSessionResult> sessions; + + for (ui32 i = 0; i < activeSessionsLimit; ++i) { + sessions.emplace_back(client.GetSession().ExtractValueSync()); + UNIT_ASSERT_VALUES_EQUAL(sessions.back().IsSuccess(), true); + } + + for (ui32 i = 0; i < activeSessionsLimit; ++i) { + sessionFutures.emplace_back(client.GetSession()); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + for (auto& sessionFuture : sessionFutures) { + UNIT_ASSERT(!sessionFuture.HasValue()); + } + + // Wait for wait session timeout + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + for (auto& sessionFuture : sessionFutures) { + UNIT_ASSERT(sessionFuture.HasValue()); + UNIT_ASSERT(!sessionFuture.ExtractValueSync().IsSuccess()); + } + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit); + + sessionFutures.clear(); + sessions.clear(); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); + } + + Y_UNIT_TEST(PeriodicTask1) { + ui32 activeSessionsLimit = 1; + + TDefaultTestSetup setup(activeSessionsLimit); + auto& client = setup.GetClient(); + + TestPeriodicTask(activeSessionsLimit, client); + } + + Y_UNIT_TEST(PeriodicTask10) { + ui32 activeSessionsLimit = 10; + + TDefaultTestSetup setup(activeSessionsLimit); + auto& client = setup.GetClient(); + + TestPeriodicTask(activeSessionsLimit, client); + } + + Y_UNIT_TEST(FailTest) { + // This test reproduces bug from KIKIMR-18063 + TDefaultTestSetup setup(1); + auto& client = setup.GetClient(); + + auto sessionFromPool = client.GetSession().ExtractValueSync(); + auto futureInWaitPool = client.GetSession(); + + { + auto standaloneSessionThatWillBeBroken = client.CreateSession().ExtractValueSync(); + auto res = standaloneSessionThatWillBeBroken.GetSession().ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync(); + } + } +} |