aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobdrynkin <robdrynkin@yandex-team.com>2023-06-12 10:52:35 +0300
committerrobdrynkin <robdrynkin@yandex-team.com>2023-06-12 10:52:35 +0300
commitd4d1325a138f2bea1fbb4aa9ef8266382f7a09d1 (patch)
tree325e185ce471e523420dff780e94ac739269ef0f
parent90abfcac34c50a1d082fd4e6f6e237a4b56d31c5 (diff)
downloadydb-d4d1325a138f2bea1fbb4aa9ef8266382f7a09d1.tar.gz
Add waiters queue + refactoring
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h13
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp7
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp103
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/readers.h66
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp346
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h90
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp1135
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h369
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp1730
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h8
-rw-r--r--ydb/services/ydb/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.darwin-x86_64.txt81
-rw-r--r--ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-aarch64.txt84
-rw-r--r--ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-x86_64.txt86
-rw-r--r--ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.txt17
-rw-r--r--ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.windows-x86_64.txt74
-rw-r--r--ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp448
28 files changed, 2947 insertions, 1740 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
index e843cda0e0..2665b62aec 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
@@ -183,13 +183,18 @@ public:
struct TSessionPoolStatCollector {
TSessionPoolStatCollector(::NMonitoring::TIntGauge* activeSessions = nullptr
, ::NMonitoring::TIntGauge* inPoolSessions = nullptr
- , ::NMonitoring::TRate* fakeSessions = nullptr)
- : ActiveSessions(activeSessions), InPoolSessions(inPoolSessions), FakeSessions(fakeSessions)
+ , ::NMonitoring::TRate* fakeSessions = nullptr
+ , ::NMonitoring::TIntGauge* waiters = nullptr)
+ : ActiveSessions(activeSessions)
+ , InPoolSessions(inPoolSessions)
+ , FakeSessions(fakeSessions)
+ , Waiters(waiters)
{ }
::NMonitoring::TIntGauge* ActiveSessions;
::NMonitoring::TIntGauge* InPoolSessions;
::NMonitoring::TRate* FakeSessions;
+ ::NMonitoring::TIntGauge* Waiters;
};
struct TClientRetryOperationStatCollector {
@@ -266,6 +271,7 @@ public:
CacheMiss_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/ClientQueryCacheMiss"} }));
ActiveSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InUse"} }));
InPoolSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InPool"} }));
+ Waiters_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/WaitForReturn"} }));
SessionCV_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "SessionBalancer/Variation"} }));
SessionRemovedDueBalancing_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/SessionsRemoved"} }));
RequestMigrated_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/RequestsMigrated"} }));
@@ -348,7 +354,7 @@ public:
return TSessionPoolStatCollector();
}
- return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get());
+ return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get(), Waiters_.Get());
}
TClientStatCollector GetClientStatCollector() {
@@ -386,6 +392,7 @@ private:
TAtomicCounter<::NMonitoring::TRate> DiscoveryFailDueTransportError_;
TAtomicPointer<::NMonitoring::TIntGauge> ActiveSessions_;
TAtomicPointer<::NMonitoring::TIntGauge> InPoolSessions_;
+ TAtomicPointer<::NMonitoring::TIntGauge> Waiters_;
TAtomicCounter<::NMonitoring::TIntGauge> SessionCV_;
TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing_;
TAtomicCounter<::NMonitoring::TRate> RequestMigrated_;
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
index 9183195ace..fad066f801 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
+++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
@@ -1 +1 @@
-2.4.0 \ No newline at end of file
+2.5.0 \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.darwin-x86_64.txt
index 2569e1c3f0..a82f241d2c 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.darwin-x86_64.txt
@@ -21,5 +21,8 @@ target_link_libraries(client-ydb_table-impl PUBLIC
target_sources(client-ydb_table-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/data_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
)
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-aarch64.txt
index e56b888c90..b3b691a276 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-aarch64.txt
@@ -22,5 +22,8 @@ target_link_libraries(client-ydb_table-impl PUBLIC
target_sources(client-ydb_table-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/data_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
)
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-x86_64.txt
index e56b888c90..b3b691a276 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.linux-x86_64.txt
@@ -22,5 +22,8 @@ target_link_libraries(client-ydb_table-impl PUBLIC
target_sources(client-ydb_table-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/data_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
)
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.windows-x86_64.txt
index 2569e1c3f0..a82f241d2c 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/CMakeLists.windows-x86_64.txt
@@ -21,5 +21,8 @@ target_link_libraries(client-ydb_table-impl PUBLIC
target_sources(client-ydb_table-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/data_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
)
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
index 7cc2925480..a15d9d9fa1 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
@@ -26,7 +26,7 @@ ui64 GetNodeIdFromSession(const TString& sessionId) {
return 0;
}
-TSession::TImpl::TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize)
+TSession::TImpl::TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize, bool isOwnedBySessionPool)
: SessionId_(sessionId)
, EndpointKey_(endpoint, GetNodeIdFromSession(sessionId))
, State_(S_STANDALONE)
@@ -36,6 +36,7 @@ TSession::TImpl::TImpl(const TString& sessionId, const TString& endpoint, bool u
, TimeToTouch_(TInstant::Now())
, TimeInPast_(TInstant::Now())
, NeedUpdateActiveCounter_(false)
+ , IsOwnedBySessionPool_(isOwnedBySessionPool)
{}
TSession::TImpl::~TImpl() {
@@ -89,6 +90,10 @@ void TSession::TImpl::MarkIdle() {
NeedUpdateActiveCounter_ = false;
}
+bool TSession::TImpl::IsOwnedBySessionPool() const {
+ return IsOwnedBySessionPool_;
+}
+
TSession::TImpl::EState TSession::TImpl::GetState() const {
// See comments in InjectSessionStatusInterception about lock
with_lock(Lock_) {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h
index 753016e25e..9fb0330fe4 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h
@@ -27,7 +27,7 @@ class TSession::TImpl : public TEndpointObj {
#ifdef YDB_IMPL_TABLE_CLIENT_SESSION_UT
public:
#endif
- TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize);
+ TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize, bool isOwnedBySessionPool);
public:
enum EState {
S_STANDALONE,
@@ -59,6 +59,7 @@ public:
void MarkStandalone();
void MarkActive();
void MarkIdle();
+ bool IsOwnedBySessionPool() const;
EState GetState() const;
void SetNeedUpdateActiveCounter(bool flag);
bool NeedUpdateActiveCounter() const;
@@ -100,6 +101,7 @@ private:
// TODO: suboptimal because need lock for atomic change from interceptor
// Rewrite with bit field
bool NeedUpdateActiveCounter_;
+ const bool IsOwnedBySessionPool_;
};
} // namespace NTable
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp
new file mode 100644
index 0000000000..721337dbde
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp
@@ -0,0 +1,103 @@
+#include "readers.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
+
+
+namespace NYdb {
+namespace NTable {
+
+using namespace NThreading;
+
+
+TTablePartIterator::TReaderImpl::TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint)
+ : StreamProcessor_(streamProcessor)
+ , Finished_(false)
+ , Endpoint_(endpoint)
+{}
+
+TTablePartIterator::TReaderImpl::~TReaderImpl() {
+ StreamProcessor_->Cancel();
+}
+
+bool TTablePartIterator::TReaderImpl::IsFinished() {
+ return Finished_;
+}
+
+TAsyncSimpleStreamPart<TResultSet> TTablePartIterator::TReaderImpl::ReadNext(std::shared_ptr<TSelf> self) {
+ auto promise = NThreading::NewPromise<TSimpleStreamPart<TResultSet>>();
+ // Capture self - guarantee no dtor call during the read
+ auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable {
+ std::optional<TReadTableSnapshot> snapshot;
+ if (self->Response_.has_snapshot()) {
+ snapshot.emplace(
+ self->Response_.snapshot().plan_step(),
+ self->Response_.snapshot().tx_id());
+ }
+ if (!grpcStatus.Ok()) {
+ self->Finished_ = true;
+ promise.SetValue({TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())),
+ TStatus(TPlainStatus(grpcStatus, self->Endpoint_)),
+ snapshot});
+ } else {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(self->Response_.issues(), issues);
+ EStatus clientStatus = static_cast<EStatus>(self->Response_.status());
+ promise.SetValue({TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())),
+ TStatus(clientStatus, std::move(issues)),
+ snapshot});
+ }
+ };
+ StreamProcessor_->Read(&Response_, readCb);
+ return promise.GetFuture();
+}
+
+
+
+TScanQueryPartIterator::TReaderImpl::TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint)
+ : StreamProcessor_(streamProcessor)
+ , Finished_(false)
+ , Endpoint_(endpoint)
+{}
+
+TScanQueryPartIterator::TReaderImpl::~TReaderImpl() {
+ StreamProcessor_->Cancel();
+}
+
+bool TScanQueryPartIterator::TReaderImpl::IsFinished() const {
+ return Finished_;
+}
+
+TAsyncScanQueryPart TScanQueryPartIterator::TReaderImpl::ReadNext(std::shared_ptr<TSelf> self) {
+ auto promise = NThreading::NewPromise<TScanQueryPart>();
+ // Capture self - guarantee no dtor call during the read
+ auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable {
+ if (!grpcStatus.Ok()) {
+ self->Finished_ = true;
+ promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_))});
+ } else {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(self->Response_.issues(), issues);
+ EStatus clientStatus = static_cast<EStatus>(self->Response_.status());
+ // TODO: Add headers for streaming calls.
+ TPlainStatus plainStatus{clientStatus, std::move(issues), self->Endpoint_, {}};
+ TStatus status{std::move(plainStatus)};
+ TMaybe<TQueryStats> queryStats;
+
+ if (self->Response_.result().has_query_stats()) {
+ queryStats = TQueryStats(self->Response_.result().query_stats());
+ }
+
+ if (self->Response_.result().has_result_set()) {
+ promise.SetValue({std::move(status),
+ TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats});
+ } else {
+ promise.SetValue({std::move(status), queryStats});
+ }
+ }
+ };
+ StreamProcessor_->Read(&Response_, readCb);
+ return promise.GetFuture();
+}
+
+}
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/readers.h b/ydb/public/sdk/cpp/client/ydb_table/impl/readers.h
new file mode 100644
index 0000000000..8103ce0925
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/readers.h
@@ -0,0 +1,66 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
+
+#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+
+#include <util/random/random.h>
+
+#include "client_session.h"
+#include "data_query.h"
+#include "request_migrator.h"
+
+
+namespace NYdb {
+namespace NTable {
+
+using namespace NThreading;
+
+
+class TTablePartIterator::TReaderImpl {
+public:
+ using TSelf = TTablePartIterator::TReaderImpl;
+ using TResponse = Ydb::Table::ReadTableResponse;
+ using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr;
+ using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback;
+ using TGRpcStatus = NGrpc::TGrpcStatus;
+ using TBatchReadResult = std::pair<TResponse, TGRpcStatus>;
+
+ TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint);
+ ~TReaderImpl();
+ bool IsFinished();
+ TAsyncSimpleStreamPart<TResultSet> ReadNext(std::shared_ptr<TSelf> self);
+
+private:
+ TStreamProcessorPtr StreamProcessor_;
+ TResponse Response_;
+ bool Finished_;
+ TString Endpoint_;
+};
+
+
+class TScanQueryPartIterator::TReaderImpl {
+public:
+ using TSelf = TScanQueryPartIterator::TReaderImpl;
+ using TResponse = Ydb::Table::ExecuteScanQueryPartialResponse;
+ using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr;
+ using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback;
+ using TGRpcStatus = NGrpc::TGrpcStatus;
+ using TBatchReadResult = std::pair<TResponse, TGRpcStatus>;
+
+ TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint);
+ ~TReaderImpl();
+ bool IsFinished() const;
+ TAsyncScanQueryPart ReadNext(std::shared_ptr<TSelf> self);
+
+private:
+ TStreamProcessorPtr StreamProcessor_;
+ TResponse Response_;
+ bool Finished_;
+ TString Endpoint_;
+};
+
+
+}
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
index ff1729043a..35f91b69b0 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
@@ -40,7 +40,7 @@ void TRequestMigrator::SetHost(ui64 nodeId) {
CurHost_ = nodeId;
}
-bool TRequestMigrator::IsOurSession(TSession::TImpl* session) const {
+bool TRequestMigrator::IsOurSession(const TSession::TImpl* session) const {
if (!CurHost_)
return false;
@@ -65,7 +65,7 @@ bool TRequestMigrator::Reset() {
}
}
-bool TRequestMigrator::DoCheckAndMigrate(TSession::TImpl* session) {
+bool TRequestMigrator::DoCheckAndMigrate(const TSession::TImpl* session) {
if (session->GetEndpoint().empty())
return false;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
index 2eb7e1b609..fbe6875b3a 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
@@ -51,13 +51,13 @@ public:
// Returns false if session is not suitable or unable to get lock to start migration
// Returns true if session is suitable in this case Unlink methos on the session is called
// This methos is thread safe.
- bool DoCheckAndMigrate(TSession::TImpl* session);
+ bool DoCheckAndMigrate(const TSession::TImpl* session);
// Reset migrator to initiall state if migration was not started and returns true
// Returns false if migration was started
bool Reset();
private:
- bool IsOurSession(TSession::TImpl* session) const;
+ bool IsOurSession(const TSession::TImpl* session) const;
ui64 CurHost_ = 0;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
new file mode 100644
index 0000000000..1e360b78af
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
@@ -0,0 +1,346 @@
+#include "session_pool.h"
+#include "table_client.h"
+
+
+namespace NYdb {
+namespace NTable {
+
+using namespace NThreading;
+
+
+TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout)
+ : MaxQueueSize_(maxQueueSize)
+ , MaxWaitSessionTimeout_(maxWaitSessionTimeout)
+{
+}
+
+bool TSessionPool::TWaitersQueue::TryPush(NThreading::TPromise<TCreateSessionResult>& p) {
+ if (Waiters_.size() < MaxQueueSize_) {
+ Waiters_.insert(std::make_pair(TInstant::Now(), p));
+ return true;
+ }
+ return false;
+}
+
+TMaybe<NThreading::TPromise<TCreateSessionResult>> TSessionPool::TWaitersQueue::TryGet() {
+ if (Waiters_.empty()) {
+ return {};
+ }
+ auto it = Waiters_.begin();
+ auto result = it->second;
+ Waiters_.erase(it);
+ return result;
+}
+
+void TSessionPool::TWaitersQueue::GetOld(TInstant now, TVector<NThreading::TPromise<TCreateSessionResult>>& oldWaiters) {
+ auto it = Waiters_.begin();
+ while (it != Waiters_.end()) {
+ if (now < it->first + MaxWaitSessionTimeout_)
+ break;
+
+ oldWaiters.emplace_back(std::move(it->second));
+
+ Waiters_.erase(it++);
+ }
+}
+
+ui32 TSessionPool::TWaitersQueue::Size() const {
+ return Waiters_.size();
+}
+
+
+TSessionPool::TSessionPool(ui32 maxActiveSessions)
+ : Closed_(false)
+ , WaitersQueue_(maxActiveSessions * 10)
+ , ActiveSessions_(0)
+ , MaxActiveSessions_(maxActiveSessions)
+{}
+
+void TTableClient::TImpl::CloseAndDeleteSession(std::unique_ptr<TSession::TImpl>&& impl,
+ std::shared_ptr<TTableClient::TImpl> client) {
+ std::shared_ptr<TSession::TImpl> deleteSession(
+ impl.release(),
+ TSession::TImpl::GetSmartDeleter(client));
+
+ deleteSession->MarkBroken();
+}
+
+void TSessionPool::CreateFakeSession(
+ NThreading::TPromise<TCreateSessionResult>& promise,
+ std::shared_ptr<TTableClient::TImpl> client)
+{
+ TSession session(client, "", "", false);
+ // Mark standalone to prevent returning to session pool
+ session.SessionImpl_->MarkStandalone();
+ TCreateSessionResult val(
+ TStatus(
+ TPlainStatus(
+ EStatus::CLIENT_RESOURCE_EXHAUSTED,
+ "Active sessions limit exceeded"
+ )
+ ),
+ std::move(session)
+ );
+
+ client->ScheduleTaskUnsafe([promise, val{std::move(val)}]() mutable {
+ promise.SetValue(std::move(val));
+ }, TDuration());
+}
+
+void TSessionPool::MakeSessionPromiseFromSession(
+ TSession::TImpl* session,
+ NThreading::TPromise<TCreateSessionResult>& promise,
+ std::shared_ptr<TTableClient::TImpl> client
+) {
+ Y_VERIFY(session->GetState() == TSession::TImpl::S_IDLE);
+ Y_VERIFY(!session->GetTimeInterval());
+ session->MarkActive();
+ session->SetNeedUpdateActiveCounter(true);
+
+ TCreateSessionResult val(
+ TStatus(TPlainStatus()),
+ TSession(
+ client,
+ std::shared_ptr<TSession::TImpl>(
+ session, TSession::TImpl::GetSmartDeleter(client)
+ )
+ )
+ );
+
+ client->ScheduleTaskUnsafe(
+ [promise, val{std::move(val)}]() mutable {
+ promise.SetValue(std::move(val));
+ },
+ TDuration()
+ );
+}
+
+TAsyncCreateSessionResult TSessionPool::GetSession(
+ std::shared_ptr<TTableClient::TImpl> client,
+ const TCreateSessionSettings& settings)
+{
+ auto createSessionPromise = NewPromise<TCreateSessionResult>();
+ std::unique_ptr<TSession::TImpl> sessionImpl;
+ enum class TSessionSource {
+ Pool,
+ Waiter,
+ Error
+ } sessionSource = TSessionSource::Pool;
+
+ {
+ std::lock_guard guard(Mtx_);
+
+ if (MaxActiveSessions_ == 0 || ActiveSessions_ < MaxActiveSessions_) {
+ IncrementActiveCounterUnsafe();
+ } else if (WaitersQueue_.TryPush(createSessionPromise)) {
+ sessionSource = TSessionSource::Waiter;
+ } else {
+ sessionSource = TSessionSource::Error;
+ }
+ if (!Sessions_.empty()) {
+ auto it = std::prev(Sessions_.end());
+ sessionImpl = std::move(it->second);
+ Sessions_.erase(it);
+ }
+
+ UpdateStats();
+ }
+ if (sessionSource == TSessionSource::Waiter) {
+ // Nothing to do here
+ } else if (sessionSource == TSessionSource::Error) {
+ FakeSessionsCounter_.Inc();
+ CreateFakeSession(createSessionPromise, client);
+ } else if (sessionImpl) {
+ MakeSessionPromiseFromSession(sessionImpl.release(), createSessionPromise, client);
+ } else {
+ const auto& sessionResult = client->CreateSession(settings, false);
+ sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true));
+ }
+
+ return createSessionPromise.GetFuture();
+}
+
+bool TSessionPool::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client) {
+ NThreading::TPromise<TCreateSessionResult> createSessionPromise;
+ {
+ std::lock_guard guard(Mtx_);
+ if (Closed_)
+ return false;
+
+ if (auto prom = WaitersQueue_.TryGet()) {
+ createSessionPromise = std::move(*prom);
+ } else {
+ return false;
+ }
+ }
+
+ // This code can be called from client dtors. It may be unsafe to
+ // execute grpc call directly...
+ client->ScheduleTaskUnsafe([createSessionPromise, client]() mutable {
+ TCreateSessionSettings settings;
+ settings.ClientTimeout(CREATE_SESSION_INTERNAL_TIMEOUT);
+
+ const auto& sessionResult = client->CreateSession(settings, false);
+ sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, true));
+ }, TDuration());
+ return true;
+}
+
+bool TSessionPool::ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) {
+ // Do not set promise under the session pool lock
+ NThreading::TPromise<TCreateSessionResult> createSessionPromise;
+ {
+ std::lock_guard guard(Mtx_);
+ if (Closed_)
+ return false;
+
+ if (auto prom = WaitersQueue_.TryGet()) {
+ createSessionPromise = std::move(*prom);
+ if (!active)
+ IncrementActiveCounterUnsafe();
+ } else {
+ Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl));
+
+ if (active) {
+ Y_VERIFY(ActiveSessions_);
+ ActiveSessions_--;
+ impl->SetNeedUpdateActiveCounter(false);
+ }
+ }
+ UpdateStats();
+ }
+
+ if (createSessionPromise.Initialized()) {
+ MakeSessionPromiseFromSession(impl, createSessionPromise, client);
+ }
+
+ return true;
+}
+
+void TSessionPool::DecrementActiveCounter() {
+ std::lock_guard guard(Mtx_);
+ Y_VERIFY(ActiveSessions_);
+ ActiveSessions_--;
+ UpdateStats();
+}
+
+void TSessionPool::IncrementActiveCounterUnsafe() {
+ ActiveSessions_++;
+ UpdateStats();
+}
+
+void TSessionPool::Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close) {
+ std::lock_guard guard(Mtx_);
+ Closed_ = close;
+ for (auto it = Sessions_.begin(); it != Sessions_.end();) {
+ const bool cont = cb(std::move(it->second));
+ it = Sessions_.erase(it);
+ if (!cont)
+ break;
+ }
+ UpdateStats();
+}
+
+TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient,
+ TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate)
+{
+ auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) {
+ if (status != EStatus::SUCCESS) {
+ return false;
+ }
+
+ auto strongClient = weakClient.lock();
+ if (!strongClient) {
+ // No more clients alive - no need to run periodic,
+ // moreover it is unsafe to touch this ptr!
+ return false;
+ } else {
+ auto keepAliveBatchSize = PERIODIC_ACTION_BATCH_SIZE;
+ TVector<std::unique_ptr<TSession::TImpl>> sessionsToTouch;
+ sessionsToTouch.reserve(keepAliveBatchSize);
+ TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete;
+ sessionsToDelete.reserve(keepAliveBatchSize);
+ TVector<NThreading::TPromise<TCreateSessionResult>> waitersToReplyError;
+ waitersToReplyError.reserve(keepAliveBatchSize);
+ const auto now = TInstant::Now();
+ {
+ std::lock_guard guard(Mtx_);
+ {
+ auto& sessions = Sessions_;
+
+ auto it = sessions.begin();
+ while (it != sessions.end() && keepAliveBatchSize--) {
+ if (now < it->second->GetTimeToTouchFast())
+ break;
+
+ if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) {
+ sessionsToDelete.emplace_back(std::move(it->second));
+ } else {
+ sessionsToTouch.emplace_back(std::move(it->second));
+ }
+ sessions.erase(it++);
+ }
+ }
+
+ WaitersQueue_.GetOld(now, waitersToReplyError);
+
+ UpdateStats();
+ }
+
+ for (auto& sessionImpl : sessionsToTouch) {
+ if (sessionImpl) {
+ Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE);
+ TSession session(strongClient, std::shared_ptr<TSession::TImpl>(
+ sessionImpl.release(),
+ TSession::TImpl::GetSmartDeleter(strongClient)));
+ cmd(session);
+ }
+ }
+
+ for (auto& sessionImpl : sessionsToDelete) {
+ if (sessionImpl) {
+ Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE);
+ TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient);
+ }
+ }
+
+ for (auto& waiter : waitersToReplyError) {
+ FakeSessionsCounter_.Inc();
+ CreateFakeSession(waiter, strongClient);
+ }
+ }
+
+ return true;
+ };
+ return periodicCb;
+}
+
+i64 TSessionPool::GetActiveSessions() const {
+ std::lock_guard guard(Mtx_);
+ return ActiveSessions_;
+}
+
+i64 TSessionPool::GetActiveSessionsLimit() const {
+ return MaxActiveSessions_;
+}
+
+i64 TSessionPool::GetCurrentPoolSize() const {
+ std::lock_guard guard(Mtx_);
+ return Sessions_.size();
+}
+
+void TSessionPool::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) {
+ ActiveSessionsCounter_.Set(statCollector.ActiveSessions);
+ InPoolSessionsCounter_.Set(statCollector.InPoolSessions);
+ FakeSessionsCounter_.Set(statCollector.FakeSessions);
+ SessionWaiterCounter_.Set(statCollector.Waiters);
+}
+
+void TSessionPool::UpdateStats() {
+ ActiveSessionsCounter_.Apply(ActiveSessions_);
+ InPoolSessionsCounter_.Apply(Sessions_.size());
+ SessionWaiterCounter_.Apply(WaitersQueue_.Size());
+}
+
+}
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
new file mode 100644
index 0000000000..983d675bce
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
@@ -0,0 +1,90 @@
+#pragma once
+
+#include "client_session.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_types/core_facility/core_facility.h>
+
+#include <util/generic/map.h>
+
+
+namespace NYdb {
+namespace NTable {
+
+using namespace NThreading;
+
+
+constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session
+constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval
+constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool
+
+
+class TSessionPool {
+ typedef TAsyncCreateSessionResult
+ (*TAwareSessonProvider)
+ (std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings);
+private:
+ class TWaitersQueue {
+ public:
+ TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout=MAX_WAIT_SESSION_TIMEOUT);
+
+ bool TryPush(NThreading::TPromise<TCreateSessionResult>& p);
+ TMaybe<NThreading::TPromise<TCreateSessionResult>> TryGet();
+ void GetOld(TInstant now, TVector<NThreading::TPromise<TCreateSessionResult>>& oldWaiters);
+ ui32 Size() const;
+
+ private:
+ const ui32 MaxQueueSize_;
+ const TDuration MaxWaitSessionTimeout_;
+ TMultiMap<TInstant, NThreading::TPromise<TCreateSessionResult>> Waiters_;
+ };
+public:
+ using TKeepAliveCmd = std::function<void(TSession session)>;
+ using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>;
+ TSessionPool(ui32 maxActiveSessions);
+ // TAwareSessonProvider:
+ // function is called if session pool is empty,
+ // this is used for additional total session count limitation
+ TAsyncCreateSessionResult GetSession(
+ std::shared_ptr<TTableClient::TImpl> client,
+ const TCreateSessionSettings& settings);
+ // Returns true if session returned to pool successfully
+ bool ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client);
+ // Returns trun if has waiter and scheduled to create new session
+ // too feed it
+ bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client);
+ TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate);
+ i64 GetActiveSessions() const;
+ i64 GetActiveSessionsLimit() const;
+ i64 GetCurrentPoolSize() const;
+ void DecrementActiveCounter();
+ void IncrementActiveCounterUnsafe();
+
+ void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close);
+ void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector);
+
+ static void CreateFakeSession(NThreading::TPromise<TCreateSessionResult>& promise,
+ std::shared_ptr<TTableClient::TImpl> client);
+private:
+ void UpdateStats();
+ void MakeSessionPromiseFromSession(
+ TSession::TImpl* session,
+ NThreading::TPromise<TCreateSessionResult>& promise,
+ std::shared_ptr<TTableClient::TImpl> client
+ );
+
+ mutable std::mutex Mtx_;
+ bool Closed_;
+
+ TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_;
+ TWaitersQueue WaitersQueue_;
+
+ i64 ActiveSessions_;
+ const ui32 MaxActiveSessions_;
+ NSdkStats::TSessionCounter ActiveSessionsCounter_;
+ NSdkStats::TSessionCounter InPoolSessionsCounter_;
+ NSdkStats::TSessionCounter SessionWaiterCounter_;
+ NSdkStats::TAtomicCounter<::NMonitoring::TRate> FakeSessionsCounter_;
+};
+
+}
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
new file mode 100644
index 0000000000..096153cbcf
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
@@ -0,0 +1,1135 @@
+#include "table_client.h"
+
+
+namespace NYdb {
+namespace NTable {
+
+using namespace NThreading;
+
+
+const TKeepAliveSettings TTableClient::TImpl::KeepAliveSettings = TKeepAliveSettings().ClientTimeout(KEEP_ALIVE_CLIENT_TIMEOUT);
+
+
+bool IsSessionCloseRequested(const TStatus& status) {
+ const auto& meta = status.GetResponseMetadata();
+ auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS);
+ for(auto it = hints.first; it != hints.second; ++it) {
+ if (it->second == NYdb::YDB_SESSION_CLOSE) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+TDuration RandomizeThreshold(TDuration duration) {
+ TDuration::TValue value = duration.GetValue();
+ if (KEEP_ALIVE_RANDOM_FRACTION) {
+ const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION;
+ if (randomLimit < 2)
+ return duration;
+ value += static_cast<i64>(RandomNumber<ui64>(randomLimit));
+ }
+ return TDuration::FromValue(value);
+}
+
+TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings) {
+ return Min(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_);
+}
+
+TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings) {
+ return Max(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_);
+}
+
+TStatus GetStatus(const TOperation& operation) {
+ return operation.Status();
+}
+
+TStatus GetStatus(const TStatus& status) {
+ return status;
+}
+
+ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber) {
+ ui32 backoffSlots = 1 << std::min(retryNumber, settings.Ceiling_);
+ TDuration maxDuration = settings.SlotDuration_ * backoffSlots;
+
+ double uncertaintyRatio = std::max(std::min(settings.UncertainRatio_, 1.0), 0.0);
+ double uncertaintyMultiplier = RandomNumber<double>() * uncertaintyRatio - uncertaintyRatio + 1.0;
+
+ double durationMs = round(maxDuration.MilliSeconds() * uncertaintyMultiplier);
+
+ return std::max(std::min(durationMs, (double)MAX_BACKOFF_DURATION_MS), 0.0);
+}
+
+
+
+TTableClient::TImpl::TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings)
+ : TClientImplCommon(std::move(connections), settings)
+ , Settings_(settings)
+ , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_)
+{
+ if (!DbDriverState_->StatCollector.IsCollecting()) {
+ return;
+ }
+
+ SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector());
+ SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector());
+}
+
+TTableClient::TImpl::~TImpl() {
+ if (Connections_->GetDrainOnDtors()) {
+ Drain().Wait();
+ }
+}
+
+bool TTableClient::TImpl::LinkObjToEndpoint(const TEndpointKey& endpoint, TEndpointObj* obj, const void* tag) {
+ return DbDriverState_->EndpointPool.LinkObjToEndpoint(endpoint, obj, tag);
+}
+
+void TTableClient::TImpl::InitStopper() {
+ std::weak_ptr<TTableClient::TImpl> weak = shared_from_this();
+ auto cb = [weak]() mutable {
+ auto strong = weak.lock();
+ if (!strong) {
+ auto promise = NThreading::NewPromise<void>();
+ promise.SetException("no more client");
+ return promise.GetFuture();
+ }
+ return strong->Drain();
+ };
+
+ DbDriverState_->AddCb(std::move(cb), TDbDriverState::ENotifyType::STOP);
+}
+
+NThreading::TFuture<void> TTableClient::TImpl::Drain() {
+ TVector<std::unique_ptr<TSession::TImpl>> sessions;
+ // No realocations under lock
+ sessions.reserve(Settings_.SessionPoolSettings_.MaxActiveSessions_);
+ auto drainer = [&sessions](std::unique_ptr<TSession::TImpl>&& impl) mutable {
+ sessions.push_back(std::move(impl));
+ return true;
+ };
+ SessionPool_.Drain(drainer, true);
+ TVector<TAsyncStatus> closeResults;
+ for (auto& s : sessions) {
+ if (s->GetId()) {
+ closeResults.push_back(CloseInternal(s.get()));
+ }
+ }
+ sessions.clear();
+ return NThreading::WaitExceptionOrAll(closeResults);
+}
+
+NThreading::TFuture<void> TTableClient::TImpl::Stop() {
+ return Drain();
+}
+
+void TTableClient::TImpl::ScheduleTask(const std::function<void()>& fn, TDuration timeout) {
+ std::weak_ptr<TTableClient::TImpl> weak = shared_from_this();
+ auto cbGuard = [weak, fn]() {
+ auto strongClient = weak.lock();
+ if (strongClient) {
+ fn();
+ }
+ };
+ Connections_->ScheduleOneTimeTask(std::move(cbGuard), timeout);
+}
+
+void TTableClient::TImpl::ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) {
+ Connections_->ScheduleOneTimeTask(std::move(fn), timeout);
+}
+
+void TTableClient::TImpl::AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn) {
+ auto durationMs = CalcBackoffTime(settings, retryNumber);
+ ScheduleTask(fn, TDuration::MilliSeconds(durationMs));
+}
+
+void TTableClient::TImpl::StartPeriodicSessionPoolTask() {
+
+ auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) {
+
+ const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_;
+ const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast();
+
+ if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) {
+ if (sessionsCount > sessionPoolSettings.MinPoolSize_) {
+ return true;
+ }
+ }
+
+ return false;
+ };
+
+ auto keepAliveCmd = [](TSession session) {
+
+ Y_VERIFY(session.GetId());
+
+ const auto sessionPoolSettings = session.Client_->Settings_.SessionPoolSettings_;
+ const auto spentTime = session.SessionImpl_->GetTimeToTouchFast() - session.SessionImpl_->GetTimeInPastFast();
+
+ const auto maxTimeToTouch = GetMaxTimeToTouch(session.Client_->Settings_.SessionPoolSettings_);
+ const auto minTimeToTouch = GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_);
+
+ auto calcTimeToNextTouch = [maxTimeToTouch, minTimeToTouch] (const TDuration spent) {
+ auto timeToNextTouch = minTimeToTouch;
+ if (maxTimeToTouch > spent) {
+ auto t = maxTimeToTouch - spent;
+ timeToNextTouch = Min(t, minTimeToTouch);
+ }
+ return timeToNextTouch;
+ };
+
+ if (spentTime >= sessionPoolSettings.KeepAliveIdleThreshold_) {
+
+ // Handle of session status will be done inside InjectSessionStatusInterception routine.
+ // We just need to reschedule time to next call because InjectSessionStatusInterception doesn't
+ // update timeInPast for calls from internal keep alive routine
+ session.KeepAlive(KeepAliveSettings)
+ .Subscribe([spentTime, session, maxTimeToTouch, calcTimeToNextTouch](TAsyncKeepAliveResult asyncResult) {
+ if (!asyncResult.GetValue().IsSuccess())
+ return;
+
+ if (spentTime >= maxTimeToTouch) {
+ auto timeToNextTouch = calcTimeToNextTouch(spentTime);
+ session.SessionImpl_->ScheduleTimeToTouchFast(timeToNextTouch, true);
+ }
+ });
+ return;
+ }
+
+ auto timeToNextTouch = calcTimeToNextTouch(spentTime);
+ session.SessionImpl_->ScheduleTimeToTouchFast(
+ RandomizeThreshold(timeToNextTouch),
+ spentTime >= maxTimeToTouch
+ );
+ };
+
+ std::weak_ptr<TTableClient::TImpl> weak = shared_from_this();
+ Connections_->AddPeriodicTask(
+ SessionPool_.CreatePeriodicTask(
+ weak,
+ std::move(keepAliveCmd),
+ std::move(deletePredicate)
+ ), PERIODIC_ACTION_INTERVAL);
+}
+
+ui64 TTableClient::TImpl::ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client) {
+ size_t max = 0;
+ ui64 result = 0;
+
+ auto cb = [&result, &max](ui64 nodeId, const IObjRegistryHandle& handle) {
+ const auto sz = handle.Size();
+ if (sz > max) {
+ result = nodeId;
+ max = sz;
+ }
+ };
+
+ client->DbDriverState_->ForEachForeignEndpoint(cb, client.get());
+
+ return result;
+}
+
+std::pair<ui64, size_t> TTableClient::TImpl::ScanLocation(std::shared_ptr<TTableClient::TImpl> client,
+ std::unordered_map<ui64, size_t>& sessions, bool allNodes)
+{
+ std::pair<ui64, size_t> result = {0, 0};
+
+ auto cb = [&result, &sessions](ui64 nodeId, const IObjRegistryHandle& handle) {
+ const auto sz = handle.Size();
+ sessions.insert({nodeId, sz});
+ if (sz > result.second) {
+ result.first = nodeId;
+ result.second = sz;
+ }
+ };
+
+ if (allNodes) {
+ client->DbDriverState_->ForEachEndpoint(cb, client.get());
+ } else {
+ client->DbDriverState_->ForEachLocalEndpoint(cb, client.get());
+ }
+
+ return result;
+}
+
+NMath::TStats TTableClient::TImpl::CalcCV(const std::unordered_map<ui64, size_t>& in) {
+ TVector<size_t> t;
+ t.reserve(in.size());
+ std::transform(in.begin(), in.end(), std::back_inserter(t), [](const std::pair<ui64, size_t>& pair) {
+ return pair.second;
+ });
+ return NMath::CalcCV(t);
+}
+
+void TTableClient::TImpl::StartPeriodicHostScanTask() {
+ std::weak_ptr<TTableClient::TImpl> weak = shared_from_this();
+
+ // The future in completed when we have finished current migrate task
+ // and ready to accept new one
+ std::pair<ui64, size_t> winner = {0, 0};
+
+ auto periodicCb = [weak, winner](NYql::TIssues&&, EStatus status) mutable -> bool {
+
+ if (status != EStatus::SUCCESS) {
+ return false;
+ }
+
+ auto strongClient = weak.lock();
+ if (!strongClient) {
+ return false;
+ } else {
+ TRequestMigrator& migrator = strongClient->RequestMigrator_;
+
+ const auto balancingPolicy = strongClient->DbDriverState_->GetBalancingPolicy();
+
+ // Try to find any host at foreign locations if prefer local dc
+ const ui64 foreignHost = (balancingPolicy == EBalancingPolicy::UsePreferableLocation) ?
+ ScanForeignLocations(strongClient) : 0;
+
+ std::unordered_map<ui64, size_t> hostMap;
+
+ winner = ScanLocation(strongClient, hostMap,
+ balancingPolicy == EBalancingPolicy::UseAllNodes);
+
+ bool forceMigrate = false;
+
+ // There is host in foreign locations
+ if (foreignHost) {
+ // But no hosts at local
+ if (hostMap.empty()) {
+ Y_VERIFY(!winner.second);
+ // Scan whole cluster - we have no local dc
+ winner = ScanLocation(strongClient, hostMap, true);
+ } else {
+ // We have local and foreign hosts, so force migration to local one
+ forceMigrate = true;
+ // Just replace source
+ winner.first = foreignHost;
+ winner.second++;
+ }
+ }
+
+ const auto minCv = strongClient->Settings_.MinSessionCV_;
+
+ const auto stats = CalcCV(hostMap);
+
+ strongClient->DbDriverState_->StatCollector.SetSessionCV(stats.Cv);
+
+ // Just scan to update monitoring counter ^^
+ // Balancing feature is disabled.
+ if (!minCv)
+ return true;
+
+ if (hostMap.size() < 2)
+ return true;
+
+ // Migrate last session only if move from foreign to local
+ if (!forceMigrate && winner.second < 2)
+ return true;
+
+ if (stats.Cv > minCv || forceMigrate) {
+ migrator.SetHost(winner.first);
+ } else {
+ migrator.SetHost(0);
+ }
+ return true;
+ }
+ };
+
+ Connections_->AddPeriodicTask(std::move(periodicCb), HOSTSCAN_PERIODIC_ACTION_INTERVAL);
+}
+
+TAsyncCreateSessionResult TTableClient::TImpl::GetSession(const TCreateSessionSettings& settings) {
+ return SessionPool_.GetSession(shared_from_this(), settings);
+}
+
+i64 TTableClient::TImpl::GetActiveSessionCount() const {
+ return SessionPool_.GetActiveSessions();
+}
+
+i64 TTableClient::TImpl::GetActiveSessionsLimit() const {
+ return SessionPool_.GetActiveSessionsLimit();
+}
+
+i64 TTableClient::TImpl::GetCurrentPoolSize() const {
+ return SessionPool_.GetCurrentPoolSize();
+}
+
+TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessionSettings& settings, bool standalone,
+ TString preferedLocation)
+{
+ auto request = MakeOperationRequest<Ydb::Table::CreateSessionRequest>(settings);
+
+ auto createSessionPromise = NewPromise<TCreateSessionResult>();
+ auto self = shared_from_this();
+ auto rpcSettings = TRpcRequestSettings::Make(settings);
+ rpcSettings.Header.push_back({NYdb::YDB_CLIENT_CAPABILITIES, NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER});
+
+ auto createSessionExtractor = [createSessionPromise, self, standalone]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ Ydb::Table::CreateSessionResult result;
+ if (any) {
+ any->UnpackTo(&result);
+ }
+ auto session = TSession(self, result.session_id(), status.Endpoint, !standalone);
+ if (status.Ok()) {
+ if (standalone) {
+ session.SessionImpl_->MarkStandalone();
+ } else {
+ session.SessionImpl_->MarkActive();
+ }
+ self->DbDriverState_->StatCollector.IncSessionsOnHost(status.Endpoint);
+ } else {
+ // We do not use SessionStatusInterception for CreateSession request
+ session.SessionImpl_->MarkBroken();
+ }
+ TCreateSessionResult val(TStatus(std::move(status)), std::move(session));
+ createSessionPromise.SetValue(std::move(val));
+ };
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse>(
+ std::move(request),
+ createSessionExtractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncCreateSession,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ rpcSettings,
+ TEndpointKey(preferedLocation, 0));
+
+ std::weak_ptr<TDbDriverState> state = DbDriverState_;
+
+ return createSessionPromise.GetFuture();
+}
+
+TAsyncKeepAliveResult TTableClient::TImpl::KeepAlive(const TSession::TImpl* session, const TKeepAliveSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::Table::KeepAliveRequest>(settings);
+ request.set_session_id(session->GetId());
+
+ auto keepAliveResultPromise = NewPromise<TKeepAliveResult>();
+ auto self = shared_from_this();
+
+ auto keepAliveExtractor = [keepAliveResultPromise, self]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ Ydb::Table::KeepAliveResult result;
+ ESessionStatus sessionStatus = ESessionStatus::Unspecified;
+ if (any) {
+ any->UnpackTo(&result);
+
+ switch (result.session_status()) {
+ case Ydb::Table::KeepAliveResult_SessionStatus_SESSION_STATUS_READY:
+ sessionStatus = ESessionStatus::Ready;
+ break;
+ case Ydb::Table::KeepAliveResult_SessionStatus_SESSION_STATUS_BUSY:
+ sessionStatus = ESessionStatus::Busy;
+ break;
+ default:
+ sessionStatus = ESessionStatus::Unspecified;
+ }
+ }
+ TKeepAliveResult val(TStatus(std::move(status)), sessionStatus);
+ keepAliveResultPromise.SetValue(std::move(val));
+ };
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::KeepAliveRequest, Ydb::Table::KeepAliveResponse>(
+ std::move(request),
+ keepAliveExtractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncKeepAlive,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ session->GetEndpointKey());
+
+ return keepAliveResultPromise.GetFuture();
+}
+
+TFuture<TStatus> TTableClient::TImpl::CreateTable(Ydb::Table::CreateTableRequest&& request, const TCreateTableSettings& settings)
+{
+ return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CreateTableRequest,Ydb::Table::CreateTableResponse>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncCreateTable,
+ TRpcRequestSettings::Make(settings));
+}
+
+TFuture<TStatus> TTableClient::TImpl::AlterTable(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings)
+{
+ return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::AlterTableRequest, Ydb::Table::AlterTableResponse>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncAlterTable,
+ TRpcRequestSettings::Make(settings));
+}
+
+TAsyncOperation TTableClient::TImpl::AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings)
+{
+ using Ydb::Table::V1::TableService;
+ using Ydb::Table::AlterTableRequest;
+ using Ydb::Table::AlterTableResponse;
+ return RunOperation<TableService, AlterTableRequest, AlterTableResponse, TOperation>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncAlterTable,
+ TRpcRequestSettings::Make(settings));
+}
+
+TFuture<TStatus> TTableClient::TImpl::CopyTable(const TString& sessionId, const TString& src, const TString& dst,
+ const TCopyTableSettings& settings)
+{
+ auto request = MakeOperationRequest<Ydb::Table::CopyTableRequest>(settings);
+ request.set_session_id(sessionId);
+ request.set_source_path(src);
+ request.set_destination_path(dst);
+
+ return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CopyTableRequest, Ydb::Table::CopyTableResponse>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncCopyTable,
+ TRpcRequestSettings::Make(settings));
+}
+
+TFuture<TStatus> TTableClient::TImpl::CopyTables(Ydb::Table::CopyTablesRequest&& request, const TCopyTablesSettings& settings)
+{
+ return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CopyTablesRequest, Ydb::Table::CopyTablesResponse>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncCopyTables,
+ TRpcRequestSettings::Make(settings));
+}
+
+TFuture<TStatus> TTableClient::TImpl::RenameTables(Ydb::Table::RenameTablesRequest&& request, const TRenameTablesSettings& settings)
+{
+ return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RenameTablesRequest, Ydb::Table::RenameTablesResponse>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncRenameTables,
+ TRpcRequestSettings::Make(settings));
+}
+
+TFuture<TStatus> TTableClient::TImpl::DropTable(const TString& sessionId, const TString& path, const TDropTableSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::Table::DropTableRequest>(settings);
+ request.set_session_id(sessionId);
+ request.set_path(path);
+
+ return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DropTableRequest, Ydb::Table::DropTableResponse>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncDropTable,
+ TRpcRequestSettings::Make(settings));
+}
+
+TAsyncDescribeTableResult TTableClient::TImpl::DescribeTable(const TString& sessionId, const TString& path, const TDescribeTableSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::Table::DescribeTableRequest>(settings);
+ request.set_session_id(sessionId);
+ request.set_path(path);
+ if (settings.WithKeyShardBoundary_) {
+ request.set_include_shard_key_bounds(true);
+ }
+
+ if (settings.WithTableStatistics_) {
+ request.set_include_table_stats(true);
+ }
+
+ if (settings.WithPartitionStatistics_) {
+ request.set_include_partition_stats(true);
+ }
+
+ auto promise = NewPromise<TDescribeTableResult>();
+
+ auto extractor = [promise, settings]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ Ydb::Table::DescribeTableResult result;
+ if (any) {
+ any->UnpackTo(&result);
+ }
+ TDescribeTableResult describeTableResult(TStatus(std::move(status)),
+ std::move(result), settings);
+ promise.SetValue(std::move(describeTableResult));
+ };
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::DescribeTableRequest, Ydb::Table::DescribeTableResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncDescribeTable,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings));
+
+ return promise.GetFuture();
+}
+
+TAsyncPrepareQueryResult TTableClient::TImpl::PrepareDataQuery(const TSession& session, const TString& query,
+ const TPrepareDataQuerySettings& settings)
+{
+ auto request = MakeOperationRequest<Ydb::Table::PrepareDataQueryRequest>(settings);
+ request.set_session_id(session.GetId());
+ request.set_yql_text(query);
+
+ auto promise = NewPromise<TPrepareQueryResult>();
+
+ //See ExecuteDataQueryInternal for explanation
+ auto sessionPtr = new TSession(session);
+ auto extractor = [promise, sessionPtr, query]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ TDataQuery dataQuery(*sessionPtr, query, "");
+
+ if (any) {
+ Ydb::Table::PrepareQueryResult result;
+ any->UnpackTo(&result);
+
+ if (status.Ok()) {
+ dataQuery = TDataQuery(*sessionPtr, query, result.query_id(), result.parameters_types());
+ sessionPtr->SessionImpl_->AddQueryToCache(dataQuery);
+ }
+ }
+
+ TPrepareQueryResult prepareQueryResult(TStatus(std::move(status)),
+ dataQuery, false);
+ delete sessionPtr;
+ promise.SetValue(std::move(prepareQueryResult));
+ };
+
+ CollectQuerySize(query, QuerySizeHistogram);
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::PrepareDataQueryRequest, Ydb::Table::PrepareDataQueryResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncPrepareDataQuery,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ session.SessionImpl_->GetEndpointKey());
+
+ return promise.GetFuture();
+}
+
+TAsyncStatus TTableClient::TImpl::ExecuteSchemeQuery(const TString& sessionId, const TString& query,
+ const TExecSchemeQuerySettings& settings)
+{
+ auto request = MakeOperationRequest<Ydb::Table::ExecuteSchemeQueryRequest>(settings);
+ request.set_session_id(sessionId);
+ request.set_yql_text(query);
+
+ return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::ExecuteSchemeQueryRequest, Ydb::Table::ExecuteSchemeQueryResponse>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncExecuteSchemeQuery,
+ TRpcRequestSettings::Make(settings));
+}
+
+TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSession& session, const TTxSettings& txSettings,
+ const TBeginTxSettings& settings)
+{
+ auto request = MakeOperationRequest<Ydb::Table::BeginTransactionRequest>(settings);
+ request.set_session_id(session.GetId());
+ SetTxSettings(txSettings, request.mutable_tx_settings());
+
+ auto promise = NewPromise<TBeginTransactionResult>();
+
+ auto extractor = [promise, session]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ TString txId;
+ if (any) {
+ Ydb::Table::BeginTransactionResult result;
+ any->UnpackTo(&result);
+ txId = result.tx_meta().id();
+ }
+
+ TBeginTransactionResult beginTxResult(TStatus(std::move(status)),
+ TTransaction(session, txId));
+ promise.SetValue(std::move(beginTxResult));
+ };
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BeginTransactionRequest, Ydb::Table::BeginTransactionResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncBeginTransaction,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ session.SessionImpl_->GetEndpointKey());
+
+ return promise.GetFuture();
+}
+
+TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TTransaction& tx,
+ const TCommitTxSettings& settings)
+{
+ auto request = MakeOperationRequest<Ydb::Table::CommitTransactionRequest>(settings);
+ request.set_session_id(session.GetId());
+ request.set_tx_id(tx.GetId());
+ request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
+
+ auto promise = NewPromise<TCommitTransactionResult>();
+
+ auto extractor = [promise]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ TMaybe<TQueryStats> queryStats;
+ if (any) {
+ Ydb::Table::CommitTransactionResult result;
+ any->UnpackTo(&result);
+
+ if (result.has_query_stats()) {
+ queryStats = TQueryStats(result.query_stats());
+ }
+ }
+
+ TCommitTransactionResult commitTxResult(TStatus(std::move(status)), queryStats);
+ promise.SetValue(std::move(commitTxResult));
+ };
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::CommitTransactionRequest, Ydb::Table::CommitTransactionResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncCommitTransaction,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ session.SessionImpl_->GetEndpointKey());
+
+ return promise.GetFuture();
+}
+
+TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TTransaction& tx,
+ const TRollbackTxSettings& settings)
+{
+ auto request = MakeOperationRequest<Ydb::Table::RollbackTransactionRequest>(settings);
+ request.set_session_id(session.GetId());
+ request.set_tx_id(tx.GetId());
+
+ return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncRollbackTransaction,
+ TRpcRequestSettings::Make(settings),
+ session.SessionImpl_->GetEndpointKey());
+}
+
+TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSession& session, const TString& query,
+ const TExplainDataQuerySettings& settings)
+{
+ auto request = MakeOperationRequest<Ydb::Table::ExplainDataQueryRequest>(settings);
+ request.set_session_id(session.GetId());
+ request.set_yql_text(query);
+
+ auto promise = NewPromise<TExplainQueryResult>();
+
+ auto extractor = [promise]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ TString ast;
+ TString plan;
+ if (any) {
+ Ydb::Table::ExplainQueryResult result;
+ any->UnpackTo(&result);
+ ast = result.query_ast();
+ plan = result.query_plan();
+ }
+ TExplainQueryResult val(TStatus(std::move(status)),
+ std::move(plan), std::move(ast));
+ promise.SetValue(std::move(val));
+ };
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::ExplainDataQueryRequest, Ydb::Table::ExplainDataQueryResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncExplainDataQuery,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ session.SessionImpl_->GetEndpointKey());
+
+ return promise.GetFuture();
+}
+
+void TTableClient::TImpl::SetTypedValue(Ydb::TypedValue* protoValue, const TValue& value) {
+ protoValue->mutable_type()->CopyFrom(TProtoAccessor::GetProto(value.GetType()));
+ protoValue->mutable_value()->CopyFrom(TProtoAccessor::GetProto(value));
+}
+
+NThreading::TFuture<std::pair<TPlainStatus, TTableClient::TImpl::TReadTableStreamProcessorPtr>> TTableClient::TImpl::ReadTable(
+ const TString& sessionId,
+ const TString& path,
+ const TReadTableSettings& settings)
+{
+ auto request = MakeRequest<Ydb::Table::ReadTableRequest>();
+ request.set_session_id(sessionId);
+ request.set_path(path);
+ request.set_ordered(settings.Ordered_);
+ if (settings.RowLimit_) {
+ request.set_row_limit(settings.RowLimit_.GetRef());
+ }
+ for (const auto& col : settings.Columns_) {
+ request.add_columns(col);
+ }
+ if (settings.UseSnapshot_) {
+ request.set_use_snapshot(
+ settings.UseSnapshot_.GetRef()
+ ? Ydb::FeatureFlag::ENABLED
+ : Ydb::FeatureFlag::DISABLED);
+ }
+
+ if (settings.From_) {
+ const auto& from = settings.From_.GetRef();
+ if (from.IsInclusive()) {
+ SetTypedValue(request.mutable_key_range()->mutable_greater_or_equal(), from.GetValue());
+ } else {
+ SetTypedValue(request.mutable_key_range()->mutable_greater(), from.GetValue());
+ }
+ }
+
+ if (settings.To_) {
+ const auto& to = settings.To_.GetRef();
+ if (to.IsInclusive()) {
+ SetTypedValue(request.mutable_key_range()->mutable_less_or_equal(), to.GetValue());
+ } else {
+ SetTypedValue(request.mutable_key_range()->mutable_less(), to.GetValue());
+ }
+ }
+
+ auto promise = NewPromise<std::pair<TPlainStatus, TReadTableStreamProcessorPtr>>();
+
+ Connections_->StartReadStream<Ydb::Table::V1::TableService, Ydb::Table::ReadTableRequest, Ydb::Table::ReadTableResponse>(
+ std::move(request),
+ [promise] (TPlainStatus status, TReadTableStreamProcessorPtr processor) mutable {
+ promise.SetValue(std::make_pair(status, processor));
+ },
+ &Ydb::Table::V1::TableService::Stub::AsyncStreamReadTable,
+ DbDriverState_,
+ TRpcRequestSettings::Make(settings));
+
+ return promise.GetFuture();
+
+}
+
+TAsyncReadRowsResult TTableClient::TImpl::ReadRows(const TString& path, TValue&& keys, const TReadRowsSettings& settings) {
+ auto request = MakeRequest<Ydb::Table::ReadRowsRequest>();
+ request.set_path(path);
+ auto* protoKeys = request.mutable_keys();
+ *protoKeys->mutable_type() = TProtoAccessor::GetProto(keys.GetType());
+ *protoKeys->mutable_value() = TProtoAccessor::GetProto(keys);
+
+ auto promise = NewPromise<TReadRowsResult>();
+
+ auto responseCb = [promise]
+ (Ydb::Table::ReadRowsResponse* response, TPlainStatus status) mutable {
+ Y_VERIFY(response);
+ TResultSet resultSet = TResultSet(response->result_set());
+ TReadRowsResult val(TStatus(std::move(status)), std::move(resultSet));
+ promise.SetValue(std::move(val));
+ };
+
+ Connections_->Run<Ydb::Table::V1::TableService, Ydb::Table::ReadRowsRequest, Ydb::Table::ReadRowsResponse>(
+ std::move(request),
+ responseCb,
+ &Ydb::Table::V1::TableService::Stub::AsyncReadRows,
+ DbDriverState_,
+ TRpcRequestSettings::Make(settings), // requestSettings
+ TEndpointKey() // preferredEndpoint
+ );
+
+ return promise.GetFuture();
+}
+
+TAsyncStatus TTableClient::TImpl::Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::Table::DeleteSessionRequest>(settings);
+ request.set_session_id(sessionImpl->GetId());
+ return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse>(
+ std::move(request),
+ &Ydb::Table::V1::TableService::Stub::AsyncDeleteSession,
+ TRpcRequestSettings::Make(settings),
+ sessionImpl->GetEndpointKey());
+}
+
+TAsyncStatus TTableClient::TImpl::CloseInternal(const TSession::TImpl* sessionImpl) {
+ static const auto internalCloseSessionSettings = TCloseSessionSettings()
+ .ClientTimeout(TDuration::Seconds(2));
+
+ auto driver = Connections_;
+ return Close(sessionImpl, internalCloseSessionSettings)
+ .Apply([driver{std::move(driver)}](TAsyncStatus status) mutable
+ {
+ driver.reset();
+ return status;
+ });
+}
+
+bool TTableClient::TImpl::ReturnSession(TSession::TImpl* sessionImpl) {
+ Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE ||
+ sessionImpl->GetState() == TSession::TImpl::S_IDLE);
+
+ if (RequestMigrator_.DoCheckAndMigrate(sessionImpl)) {
+ SessionRemovedDueBalancing.Inc();
+ return false;
+ }
+
+ bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter();
+ // Also removes NeedUpdateActiveCounter flag
+ sessionImpl->MarkIdle();
+ sessionImpl->SetTimeInterval(TDuration::Zero());
+ if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter, shared_from_this())) {
+ sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter);
+ return false;
+ }
+ return true;
+}
+
+void TTableClient::TImpl::DeleteSession(TSession::TImpl* sessionImpl) {
+ // Closing not owned by session pool session should not fire getting new session
+ if (sessionImpl->IsOwnedBySessionPool()) {
+ if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this())) {
+ // We requested new session for waiter which already incremented
+ // active session counter and old session will be deleted
+ // - skip update active counter in this case
+ sessionImpl->SetNeedUpdateActiveCounter(false);
+ }
+ }
+
+ if (sessionImpl->NeedUpdateActiveCounter()) {
+ SessionPool_.DecrementActiveCounter();
+ }
+
+ if (sessionImpl->GetId()) {
+ CloseInternal(sessionImpl);
+ DbDriverState_->StatCollector.DecSessionsOnHost(sessionImpl->GetEndpoint());
+ }
+
+ delete sessionImpl;
+}
+
+ui32 TTableClient::TImpl::GetSessionRetryLimit() const {
+ return Settings_.SessionPoolSettings_.RetryLimit_;
+}
+
+void TTableClient::TImpl::SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector) {
+ CacheMissCounter.Set(collector.CacheMiss);
+ QuerySizeHistogram.Set(collector.QuerySize);
+ ParamsSizeHistogram.Set(collector.ParamsSize);
+ RetryOperationStatCollector = collector.RetryOperationStatCollector;
+ SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing);
+ RequestMigrated.Set(collector.RequestMigrated);
+}
+
+TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const TString& table, TValue&& rows, const TBulkUpsertSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings);
+ request.set_table(table);
+ *request.mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType());
+ *request.mutable_rows()->mutable_value() = std::move(rows.GetProto());
+
+ auto promise = NewPromise<TBulkUpsertResult>();
+
+ auto extractor = [promise]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ Y_UNUSED(any);
+ TBulkUpsertResult val(TStatus(std::move(status)));
+ promise.SetValue(std::move(val));
+ };
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings));
+
+ return promise.GetFuture();
+}
+
+TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const TString& table, EDataFormat format,
+ const TString& data, const TString& schema, const TBulkUpsertSettings& settings)
+{
+ auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings);
+ request.set_table(table);
+ if (format == EDataFormat::ApacheArrow) {
+ request.mutable_arrow_batch_settings()->set_schema(schema);
+ } else if (format == EDataFormat::CSV) {
+ auto* csv_settings = request.mutable_csv_settings();
+ const auto& format_settings = settings.FormatSettings_;
+ if (!format_settings.empty()) {
+ bool ok = csv_settings->ParseFromString(format_settings);
+ if (!ok) {
+ return {};
+ }
+ }
+ }
+ request.set_data(data);
+
+ auto promise = NewPromise<TBulkUpsertResult>();
+
+ auto extractor = [promise]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ Y_UNUSED(any);
+ TBulkUpsertResult val(TStatus(std::move(status)));
+ promise.SetValue(std::move(val));
+ };
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings));
+
+ return promise.GetFuture();
+}
+
+TFuture<std::pair<TPlainStatus, TTableClient::TImpl::TScanQueryProcessorPtr>> TTableClient::TImpl::StreamExecuteScanQueryInternal(const TString& query,
+ const ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
+ const TStreamExecScanQuerySettings& settings)
+{
+ auto request = MakeRequest<Ydb::Table::ExecuteScanQueryRequest>();
+ request.mutable_query()->set_yql_text(query);
+
+ if (params) {
+ *request.mutable_parameters() = *params;
+ }
+
+ if (settings.Explain_) {
+ request.set_mode(Ydb::Table::ExecuteScanQueryRequest::MODE_EXPLAIN);
+ } else {
+ request.set_mode(Ydb::Table::ExecuteScanQueryRequest::MODE_EXEC);
+ }
+
+ request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
+
+ auto promise = NewPromise<std::pair<TPlainStatus, TScanQueryProcessorPtr>>();
+
+ Connections_->StartReadStream<
+ Ydb::Table::V1::TableService,
+ Ydb::Table::ExecuteScanQueryRequest,
+ Ydb::Table::ExecuteScanQueryPartialResponse>
+ (
+ std::move(request),
+ [promise] (TPlainStatus status, TScanQueryProcessorPtr processor) mutable {
+ promise.SetValue(std::make_pair(status, processor));
+ },
+ &Ydb::Table::V1::TableService::Stub::AsyncStreamExecuteScanQuery,
+ DbDriverState_,
+ TRpcRequestSettings::Make(settings)
+ );
+
+ return promise.GetFuture();
+}
+
+TAsyncScanQueryPartIterator TTableClient::TImpl::StreamExecuteScanQuery(const TString& query,
+ const ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
+ const TStreamExecScanQuerySettings& settings)
+{
+ auto promise = NewPromise<TScanQueryPartIterator>();
+
+ auto iteratorCallback = [promise](TFuture<std::pair<TPlainStatus,
+ TTableClient::TImpl::TScanQueryProcessorPtr>> future) mutable
+ {
+ Y_ASSERT(future.HasValue());
+ auto pair = future.ExtractValue();
+ promise.SetValue(TScanQueryPartIterator(
+ pair.second
+ ? std::make_shared<TScanQueryPartIterator::TReaderImpl>(pair.second, pair.first.Endpoint)
+ : nullptr,
+ std::move(pair.first))
+ );
+ };
+
+ StreamExecuteScanQueryInternal(query, params, settings).Subscribe(iteratorCallback);
+ return promise.GetFuture();
+}
+
+// void TTableClient::TImpl::CloseAndDeleteSession(
+// std::unique_ptr<TSession::TImpl>&& impl,
+// std::shared_ptr<TTableClient::TImpl> client);
+
+
+void TTableClient::TImpl::SetParams(
+ ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
+ Ydb::Table::ExecuteDataQueryRequest* request)
+{
+ if (params) {
+ request->mutable_parameters()->swap(*params);
+ }
+}
+
+void TTableClient::TImpl::SetParams(
+ const ::google::protobuf::Map<TString, Ydb::TypedValue>& params,
+ Ydb::Table::ExecuteDataQueryRequest* request)
+{
+ *request->mutable_parameters() = params;
+}
+
+void TTableClient::TImpl::CollectParams(
+ ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
+ NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram)
+{
+
+ if (params && histgoram.IsCollecting()) {
+ size_t size = 0;
+ for (auto& keyvalue: *params) {
+ size += keyvalue.second.ByteSizeLong();
+ }
+ histgoram.Record(size);
+ }
+}
+
+void TTableClient::TImpl::CollectParams(
+ const ::google::protobuf::Map<TString, Ydb::TypedValue>& params,
+ NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram)
+{
+
+ if (histgoram.IsCollecting()) {
+ size_t size = 0;
+ for (auto& keyvalue: params) {
+ size += keyvalue.second.ByteSizeLong();
+ }
+ histgoram.Record(size);
+ }
+}
+
+void TTableClient::TImpl::CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>& querySizeHistogram) {
+ if (querySizeHistogram.IsCollecting()) {
+ querySizeHistogram.Record(query.size());
+ }
+}
+
+void TTableClient::TImpl::CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&) {}
+
+void TTableClient::TImpl::SetTxSettings(const TTxSettings& txSettings, Ydb::Table::TransactionSettings* proto)
+{
+ switch (txSettings.Mode_) {
+ case TTxSettings::TS_SERIALIZABLE_RW:
+ proto->mutable_serializable_read_write();
+ break;
+ case TTxSettings::TS_ONLINE_RO:
+ proto->mutable_online_read_only()->set_allow_inconsistent_reads(
+ txSettings.OnlineSettings_.AllowInconsistentReads_);
+ break;
+ case TTxSettings::TS_STALE_RO:
+ proto->mutable_stale_read_only();
+ break;
+ case TTxSettings::TS_SNAPSHOT_RO:
+ proto->mutable_snapshot_read_only();
+ break;
+ default:
+ throw TContractViolation("Unexpected transaction mode.");
+ }
+}
+
+void TTableClient::TImpl::SetQuery(const TString& queryText, Ydb::Table::Query* query) {
+ query->set_yql_text(queryText);
+}
+
+void TTableClient::TImpl::SetQuery(const TDataQuery& queryData, Ydb::Table::Query* query) {
+ query->set_id(queryData.GetId());
+}
+
+void TTableClient::TImpl::SetQueryCachePolicy(const TString&, const TExecDataQuerySettings& settings,
+ Ydb::Table::QueryCachePolicy* queryCachePolicy)
+{
+ queryCachePolicy->set_keep_in_cache(settings.KeepInQueryCache_ ? settings.KeepInQueryCache_.GetRef() : false);
+}
+
+void TTableClient::TImpl::SetQueryCachePolicy(const TDataQuery&, const TExecDataQuerySettings& settings,
+ Ydb::Table::QueryCachePolicy* queryCachePolicy) {
+ queryCachePolicy->set_keep_in_cache(settings.KeepInQueryCache_ ? settings.KeepInQueryCache_.GetRef() : true);
+}
+
+TMaybe<TString> TTableClient::TImpl::GetQueryText(const TString& queryText) {
+ return queryText;
+}
+
+TMaybe<TString> TTableClient::TImpl::GetQueryText(const TDataQuery& queryData) {
+ return queryData.GetText();
+}
+
+}
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
new file mode 100644
index 0000000000..5c157465b1
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
@@ -0,0 +1,369 @@
+#pragma once
+
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/scheme_helpers/helpers.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/table_helpers/helpers.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
+#undef INCLUDE_YDB_INTERNAL_H
+
+#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
+#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
+
+#include <util/random/random.h>
+
+#include "client_session.h"
+#include "data_query.h"
+#include "request_migrator.h"
+#include "readers.h"
+#include "session_pool.h"
+
+
+namespace NYdb {
+namespace NTable {
+
+using namespace NThreading;
+
+
+//How often run session pool keep alive check
+constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5);
+//How ofter run host scan to perform session balancing
+constexpr TDuration HOSTSCAN_PERIODIC_ACTION_INTERVAL = TDuration::Seconds(2);
+constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4;
+constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds();
+constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5);
+
+
+TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings);
+TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings);
+ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber);
+bool IsSessionCloseRequested(const TStatus& status);
+TDuration RandomizeThreshold(TDuration duration);
+TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings);
+TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings);
+TStatus GetStatus(const TOperation& operation);
+TStatus GetStatus(const TStatus& status);
+
+
+template<typename TResponse>
+NThreading::TFuture<TResponse> InjectSessionStatusInterception(
+ std::shared_ptr<TSession::TImpl>& impl, NThreading::TFuture<TResponse> asyncResponse,
+ bool updateTimeout,
+ TDuration timeout,
+ std::function<void(const TResponse&, TSession::TImpl&)> cb = {})
+{
+ auto promise = NThreading::NewPromise<TResponse>();
+ asyncResponse.Subscribe([impl, promise, cb, updateTimeout, timeout](NThreading::TFuture<TResponse> future) mutable {
+ Y_VERIFY(future.HasValue());
+
+ // TResponse can hold refcounted user provided data (TSession for example)
+ // and we do not want to have copy of it (for example it can cause delay dtor call)
+ // so using move semantic here is mandatory.
+ // Also we must reset captured shared pointer to session impl
+ TResponse value = std::move(future.ExtractValue());
+
+ const TStatus& status = GetStatus(value);
+ // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect
+ // since we have guarantee this request wasn't been started to execute.
+
+ if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
+ impl->MarkBroken();
+ } else if (status.GetStatus() == EStatus::SESSION_BUSY) {
+ impl->MarkBroken();
+ } else if (status.GetStatus() == EStatus::BAD_SESSION) {
+ impl->MarkBroken();
+ } else if (IsSessionCloseRequested(status)) {
+ impl->MarkAsClosing();
+ } else {
+ // NOTE: About GetState and lock
+ // Simultanious call multiple requests on the same session make no sence, due to server limitation.
+ // But user can perform this call, right now we do not protect session from this, it may cause
+ // raise on session state if respoise is not success.
+ // It should not be a problem - in case of this race we close session
+ // or put it in to settler.
+ if (updateTimeout && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
+ impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TSession::TImpl::EState::S_ACTIVE);
+ }
+ }
+ if (cb) {
+ cb(value, *impl);
+ }
+ impl.reset();
+ promise.SetValue(std::move(value));
+ });
+ return promise.GetFuture();
+}
+
+
+class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public IMigratorClient {
+public:
+ using TReadTableStreamProcessorPtr = TTablePartIterator::TReaderImpl::TStreamProcessorPtr;
+ using TScanQueryProcessorPtr = TScanQueryPartIterator::TReaderImpl::TStreamProcessorPtr;
+
+ TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings);
+ ~TImpl();
+
+ bool LinkObjToEndpoint(const TEndpointKey& endpoint, TEndpointObj* obj, const void* tag);
+ void InitStopper();
+ NThreading::TFuture<void> Drain();
+ NThreading::TFuture<void> Stop();
+ void ScheduleTask(const std::function<void()>& fn, TDuration timeout);
+ void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout);
+ void AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn);
+ void StartPeriodicSessionPoolTask();
+ static ui64 ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client);
+ static std::pair<ui64, size_t> ScanLocation(std::shared_ptr<TTableClient::TImpl> client,
+ std::unordered_map<ui64, size_t>& sessions, bool allNodes);
+ static NMath::TStats CalcCV(const std::unordered_map<ui64, size_t>& in);
+ void StartPeriodicHostScanTask();
+
+ TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings);
+ i64 GetActiveSessionCount() const;
+ i64 GetActiveSessionsLimit() const;
+ i64 GetCurrentPoolSize() const;
+ TAsyncCreateSessionResult CreateSession(const TCreateSessionSettings& settings, bool standalone,
+ TString preferedLocation = TString());
+ TAsyncKeepAliveResult KeepAlive(const TSession::TImpl* session, const TKeepAliveSettings& settings);
+
+ TFuture<TStatus> CreateTable(Ydb::Table::CreateTableRequest&& request, const TCreateTableSettings& settings);
+ TFuture<TStatus> AlterTable(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings);
+ TAsyncOperation AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings);
+ TFuture<TStatus> CopyTable(const TString& sessionId, const TString& src, const TString& dst,
+ const TCopyTableSettings& settings);
+ TFuture<TStatus> CopyTables(Ydb::Table::CopyTablesRequest&& request, const TCopyTablesSettings& settings);
+ TFuture<TStatus> RenameTables(Ydb::Table::RenameTablesRequest&& request, const TRenameTablesSettings& settings);
+ TFuture<TStatus> DropTable(const TString& sessionId, const TString& path, const TDropTableSettings& settings);
+ TAsyncDescribeTableResult DescribeTable(const TString& sessionId, const TString& path, const TDescribeTableSettings& settings);
+
+ template<typename TParamsType>
+ TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const TString& query, const TTxControl& txControl,
+ TParamsType params, const TExecDataQuerySettings& settings) {
+ auto maybeQuery = session.SessionImpl_->GetQueryFromCache(query, Settings_.AllowRequestMigration_);
+ if (maybeQuery) {
+ TDataQuery dataQuery(session, query, maybeQuery->QueryId, maybeQuery->ParameterTypes);
+ return ExecuteDataQuery(session, dataQuery, txControl, params, settings, true);
+ }
+
+ CacheMissCounter.Inc();
+
+ return InjectSessionStatusInterception(session.SessionImpl_,
+ ExecuteDataQueryInternal(session, query, txControl, params, settings, false),
+ true, GetMinTimeToTouch(Settings_.SessionPoolSettings_));
+ }
+
+ template<typename TParamsType>
+ TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const TDataQuery& dataQuery, const TTxControl& txControl,
+ TParamsType params, const TExecDataQuerySettings& settings,
+ bool fromCache) {
+ TString queryKey = dataQuery.Impl_->GetTextHash();
+ auto cb = [queryKey](const TDataQueryResult& result, TSession::TImpl& session) {
+ if (result.GetStatus() == EStatus::NOT_FOUND) {
+ session.InvalidateQueryInCache(queryKey);
+ }
+ };
+
+ return InjectSessionStatusInterception<TDataQueryResult>(
+ session.SessionImpl_,
+ session.Client_->ExecuteDataQueryInternal(session, dataQuery, txControl, params, settings, fromCache),
+ true,
+ GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_),
+ cb);
+ }
+
+ TAsyncPrepareQueryResult PrepareDataQuery(const TSession& session, const TString& query,
+ const TPrepareDataQuerySettings& settings);
+ TAsyncStatus ExecuteSchemeQuery(const TString& sessionId, const TString& query,
+ const TExecSchemeQuerySettings& settings);
+
+ TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings,
+ const TBeginTxSettings& settings);
+ TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx,
+ const TCommitTxSettings& settings);
+ TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx,
+ const TRollbackTxSettings& settings);
+
+ TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const TString& query,
+ const TExplainDataQuerySettings& settings);
+
+ static void SetTypedValue(Ydb::TypedValue* protoValue, const TValue& value);
+
+ NThreading::TFuture<std::pair<TPlainStatus, TReadTableStreamProcessorPtr>> ReadTable(
+ const TString& sessionId,
+ const TString& path,
+ const TReadTableSettings& settings);
+ TAsyncReadRowsResult ReadRows(const TString& path, TValue&& keys, const TReadRowsSettings& settings);
+
+ TAsyncStatus Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings);
+ TAsyncStatus CloseInternal(const TSession::TImpl* sessionImpl);
+
+ bool ReturnSession(TSession::TImpl* sessionImpl);
+ void DeleteSession(TSession::TImpl* sessionImpl);
+ ui32 GetSessionRetryLimit() const;
+ static void CloseAndDeleteSession(
+ std::unique_ptr<TSession::TImpl>&& impl,
+ std::shared_ptr<TTableClient::TImpl> client);
+
+
+ void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector);
+
+ TAsyncBulkUpsertResult BulkUpsert(const TString& table, TValue&& rows, const TBulkUpsertSettings& settings);
+ TAsyncBulkUpsertResult BulkUpsert(const TString& table, EDataFormat format,
+ const TString& data, const TString& schema, const TBulkUpsertSettings& settings);
+
+ TFuture<std::pair<TPlainStatus, TScanQueryProcessorPtr>> StreamExecuteScanQueryInternal(const TString& query,
+ const ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
+ const TStreamExecScanQuerySettings& settings);
+ TAsyncScanQueryPartIterator StreamExecuteScanQuery(const TString& query,
+ const ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
+ const TStreamExecScanQuerySettings& settings);
+public:
+ TClientSettings Settings_;
+
+private:
+ static void SetParams(
+ ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
+ Ydb::Table::ExecuteDataQueryRequest* request);
+
+ static void SetParams(
+ const ::google::protobuf::Map<TString, Ydb::TypedValue>& params,
+ Ydb::Table::ExecuteDataQueryRequest* request);
+
+ static void CollectParams(
+ ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
+ NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram);
+
+ static void CollectParams(
+ const ::google::protobuf::Map<TString, Ydb::TypedValue>& params,
+ NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram);
+
+ static void CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>& querySizeHistogram);
+
+ static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&);
+
+ template <typename TQueryType, typename TParamsType>
+ TAsyncDataQueryResult ExecuteDataQueryInternal(const TSession& session, const TQueryType& query,
+ const TTxControl& txControl, TParamsType params,
+ const TExecDataQuerySettings& settings, bool fromCache
+ ) {
+ auto request = MakeOperationRequest<Ydb::Table::ExecuteDataQueryRequest>(settings);
+ request.set_session_id(session.GetId());
+ auto txControlProto = request.mutable_tx_control();
+ txControlProto->set_commit_tx(txControl.CommitTx_);
+ if (txControl.TxId_) {
+ txControlProto->set_tx_id(*txControl.TxId_);
+ } else {
+ SetTxSettings(txControl.BeginTx_, txControlProto->mutable_begin_tx());
+ }
+
+ request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
+
+ SetQuery(query, request.mutable_query());
+ CollectQuerySize(query, QuerySizeHistogram);
+
+ SetParams(params, &request);
+ CollectParams(params, ParamsSizeHistogram);
+
+ SetQueryCachePolicy(query, settings, request.mutable_query_cache_policy());
+
+ auto promise = NewPromise<TDataQueryResult>();
+ bool keepInCache = settings.KeepInQueryCache_ && settings.KeepInQueryCache_.GetRef();
+
+ // We don't want to delay call of TSession dtor, so we can't capture it by copy
+ // otherwise we break session pool and other clients logic.
+ // Same problem with TDataQuery and TTransaction
+ //
+ // The fast solution is:
+ // - create copy of TSession out of lambda
+ // - capture pointer
+ // - call free just before SetValue call
+ auto sessionPtr = new TSession(session);
+ auto extractor = [promise, sessionPtr, query, fromCache, keepInCache]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ TVector<TResultSet> res;
+ TMaybe<TTransaction> tx;
+ TMaybe<TDataQuery> dataQuery;
+ TMaybe<TQueryStats> queryStats;
+
+ auto queryText = GetQueryText(query);
+ if (any) {
+ Ydb::Table::ExecuteQueryResult result;
+ any->UnpackTo(&result);
+
+ for (size_t i = 0; i < result.result_setsSize(); i++) {
+ res.push_back(TResultSet(*result.mutable_result_sets(i)));
+ }
+
+ if (result.has_tx_meta()) {
+ tx = TTransaction(*sessionPtr, result.tx_meta().id());
+ }
+
+ if (result.has_query_meta()) {
+ if (queryText) {
+ auto& query_meta = result.query_meta();
+ dataQuery = TDataQuery(*sessionPtr, *queryText, query_meta.id(), query_meta.parameters_types());
+ }
+ }
+
+ if (result.has_query_stats()) {
+ queryStats = TQueryStats(result.query_stats());
+ }
+ }
+
+ if (keepInCache && dataQuery && queryText) {
+ sessionPtr->SessionImpl_->AddQueryToCache(*dataQuery);
+ }
+
+ TDataQueryResult dataQueryResult(TStatus(std::move(status)),
+ std::move(res), tx, dataQuery, fromCache, queryStats);
+
+ delete sessionPtr;
+ tx.Clear();
+ dataQuery.Clear();
+ promise.SetValue(std::move(dataQueryResult));
+ };
+
+ Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Table::V1::TableService::Stub::AsyncExecuteDataQuery,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ session.SessionImpl_->GetEndpointKey());
+
+ return promise.GetFuture();
+ }
+
+ static void SetTxSettings(const TTxSettings& txSettings, Ydb::Table::TransactionSettings* proto);
+
+ static void SetQuery(const TString& queryText, Ydb::Table::Query* query);
+
+ static void SetQuery(const TDataQuery& queryData, Ydb::Table::Query* query);
+
+ static void SetQueryCachePolicy(const TString&, const TExecDataQuerySettings& settings,
+ Ydb::Table::QueryCachePolicy* queryCachePolicy);
+
+ static void SetQueryCachePolicy(const TDataQuery&, const TExecDataQuerySettings& settings,
+ Ydb::Table::QueryCachePolicy* queryCachePolicy);
+
+ static TMaybe<TString> GetQueryText(const TString& queryText);
+
+ static TMaybe<TString> GetQueryText(const TDataQuery& queryData);
+
+public:
+ NSdkStats::TAtomicCounter<::NMonitoring::TRate> CacheMissCounter;
+ NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector;
+ NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram;
+ NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram;
+ NSdkStats::TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing;
+ NSdkStats::TAtomicCounter<::NMonitoring::TRate> RequestMigrated;
+
+private:
+ TSessionPool SessionPool_;
+ TRequestMigrator RequestMigrator_;
+ static const TKeepAliveSettings KeepAliveSettings;
+};
+
+}
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 976f8c8b5b..9ef4048e16 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -9,12 +9,12 @@
#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h>
-#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
#include <ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h>
#include <ydb/public/sdk/cpp/client/ydb_table/impl/data_query.h>
#include <ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h>
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
#include <google/protobuf/util/time_util.h>
@@ -32,15 +32,6 @@ namespace NTable {
using namespace NThreading;
-//How often run session pool keep alive check
-constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5);
-//How ofter run host scan to perform session balancing
-constexpr TDuration HOSTSCAN_PERIODIC_ACTION_INTERVAL = TDuration::Seconds(2);
-constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4;
-constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5);
-constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval
-constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds();
-
////////////////////////////////////////////////////////////////////////////////
class TStorageSettings::TImpl {
@@ -1234,64 +1225,6 @@ TTableDescription TTableBuilder::Build() {
return TableDescription_;
}
-////////////////////////////////////////////////////////////////////////////////
-
-class TTablePartIterator::TReaderImpl {
-public:
- using TSelf = TTablePartIterator::TReaderImpl;
- using TResponse = Ydb::Table::ReadTableResponse;
- using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr;
- using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback;
- using TGRpcStatus = NGrpc::TGrpcStatus;
- using TBatchReadResult = std::pair<TResponse, TGRpcStatus>;
-
- TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint)
- : StreamProcessor_(streamProcessor)
- , Finished_(false)
- , Endpoint_(endpoint)
- {}
-
- ~TReaderImpl() {
- StreamProcessor_->Cancel();
- }
-
- bool IsFinished() {
- return Finished_;
- }
-
- TAsyncSimpleStreamPart<TResultSet> ReadNext(std::shared_ptr<TSelf> self) {
- auto promise = NThreading::NewPromise<TSimpleStreamPart<TResultSet>>();
- // Capture self - guarantee no dtor call during the read
- auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable {
- std::optional<TReadTableSnapshot> snapshot;
- if (self->Response_.has_snapshot()) {
- snapshot.emplace(
- self->Response_.snapshot().plan_step(),
- self->Response_.snapshot().tx_id());
- }
- if (!grpcStatus.Ok()) {
- self->Finished_ = true;
- promise.SetValue({TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())),
- TStatus(TPlainStatus(grpcStatus, self->Endpoint_)),
- snapshot});
- } else {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(self->Response_.issues(), issues);
- EStatus clientStatus = static_cast<EStatus>(self->Response_.status());
- promise.SetValue({TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())),
- TStatus(clientStatus, std::move(issues)),
- snapshot});
- }
- };
- StreamProcessor_->Read(&Response_, readCb);
- return promise.GetFuture();
- }
-private:
- TStreamProcessorPtr StreamProcessor_;
- TResponse Response_;
- bool Finished_;
- TString Endpoint_;
-};
TTablePartIterator::TTablePartIterator(
std::shared_ptr<TReaderImpl> impl,
@@ -1306,69 +1239,6 @@ TAsyncSimpleStreamPart<TResultSet> TTablePartIterator::ReadNext() {
return ReaderImpl_->ReadNext(ReaderImpl_);
}
-////////////////////////////////////////////////////////////////////////////////
-
-class TScanQueryPartIterator::TReaderImpl {
-public:
- using TSelf = TScanQueryPartIterator::TReaderImpl;
- using TResponse = Ydb::Table::ExecuteScanQueryPartialResponse;
- using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr;
- using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback;
- using TGRpcStatus = NGrpc::TGrpcStatus;
- using TBatchReadResult = std::pair<TResponse, TGRpcStatus>;
-
- TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint)
- : StreamProcessor_(streamProcessor)
- , Finished_(false)
- , Endpoint_(endpoint)
- {}
-
- ~TReaderImpl() {
- StreamProcessor_->Cancel();
- }
-
- bool IsFinished() const {
- return Finished_;
- }
-
- TAsyncScanQueryPart ReadNext(std::shared_ptr<TSelf> self) {
- auto promise = NThreading::NewPromise<TScanQueryPart>();
- // Capture self - guarantee no dtor call during the read
- auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable {
- if (!grpcStatus.Ok()) {
- self->Finished_ = true;
- promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_))});
- } else {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(self->Response_.issues(), issues);
- EStatus clientStatus = static_cast<EStatus>(self->Response_.status());
- // TODO: Add headers for streaming calls.
- TPlainStatus plainStatus{clientStatus, std::move(issues), self->Endpoint_, {}};
- TStatus status{std::move(plainStatus)};
- TMaybe<TQueryStats> queryStats;
-
- if (self->Response_.result().has_query_stats()) {
- queryStats = TQueryStats(self->Response_.result().query_stats());
- }
-
- if (self->Response_.result().has_result_set()) {
- promise.SetValue({std::move(status),
- TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats});
- } else {
- promise.SetValue({std::move(status), queryStats});
- }
- }
- };
- StreamProcessor_->Read(&Response_, readCb);
- return promise.GetFuture();
- }
-private:
- TStreamProcessorPtr StreamProcessor_;
- TResponse Response_;
- bool Finished_;
- TString Endpoint_;
-};
-
TScanQueryPartIterator::TScanQueryPartIterator(
std::shared_ptr<TReaderImpl> impl,
TPlainStatus&& status)
@@ -1382,1589 +1252,7 @@ TAsyncScanQueryPart TScanQueryPartIterator::ReadNext() {
return ReaderImpl_->ReadNext(ReaderImpl_);
}
-////////////////////////////////////////////////////////////////////////////////
-
-class TSessionPoolImpl {
- typedef TAsyncCreateSessionResult
- (*TAwareSessonProvider)
- (std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings);
-public:
- using TKeepAliveCmd = std::function<void(TSession session)>;
- using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>;
- TSessionPoolImpl(ui32 maxActiveSessions);
- // TAwareSessonProvider:
- // function is called if session pool is empty,
- // this is used for additional total session count limitation
- TAsyncCreateSessionResult GetSession(
- std::shared_ptr<TTableClient::TImpl> client,
- const TCreateSessionSettings& settings);
- // Returns true if session was extracted from session pool and dropped via smart deleter
- // Returns false if session for given endpoint was not found
- // NOTE: O(n) under session pool lock, should not be used often
- bool DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, ui64 nodeId);
- // Returns true if session returned to pool successfully
- bool ReturnSession(TSession::TImpl* impl, bool active);
- TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate);
- i64 GetActiveSessions() const;
- i64 GetActiveSessionsLimit() const;
- i64 GetCurrentPoolSize() const;
- void DecrementActiveCounter();
-
- void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close);
- void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector);
-
- static void CreateFakeSession(NThreading::TPromise<TCreateSessionResult>& promise,
- std::shared_ptr<TTableClient::TImpl> client);
-private:
- void UpdateStats();
-
- mutable std::mutex Mtx_;
- TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_;
- bool Closed_;
- i64 ActiveSessions_;
- const ui32 MaxActiveSessions_;
- NSdkStats::TSessionCounter ActiveSessionsCounter_;
- NSdkStats::TSessionCounter InPoolSessionsCounter_;
- NSdkStats::TAtomicCounter<::NMonitoring::TRate> FakeSessionsCounter_;
-};
-
-static TDuration RandomizeThreshold(TDuration duration) {
- TDuration::TValue value = duration.GetValue();
- if (KEEP_ALIVE_RANDOM_FRACTION) {
- const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION;
- if (randomLimit < 2)
- return duration;
- value += static_cast<i64>(RandomNumber<ui64>(randomLimit));
- }
- return TDuration::FromValue(value);
-}
-
-static TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings) {
- return Min(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_);
-}
-
-static TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings) {
- return Max(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_);
-}
-
-static TStatus GetStatus(const TOperation& operation) {
- return operation.Status();
-}
-
-static TStatus GetStatus(const TStatus& status) {
- return status;
-}
-
-static bool IsSessionCloseRequested(const TStatus& status) {
- const auto& meta = status.GetResponseMetadata();
- auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS);
- for(auto it = hints.first; it != hints.second; ++it) {
- if (it->second == NYdb::YDB_SESSION_CLOSE) {
- return true;
- }
- }
-
- return false;
-}
-
-template<typename TResponse>
-NThreading::TFuture<TResponse> InjectSessionStatusInterception(
- std::shared_ptr<TSession::TImpl>& impl, NThreading::TFuture<TResponse> asyncResponse,
- bool updateTimeout,
- TDuration timeout,
- std::function<void(const TResponse&, TSession::TImpl&)> cb = {})
-{
- auto promise = NThreading::NewPromise<TResponse>();
- asyncResponse.Subscribe([impl, promise, cb, updateTimeout, timeout](NThreading::TFuture<TResponse> future) mutable {
- Y_VERIFY(future.HasValue());
-
- // TResponse can hold refcounted user provided data (TSession for example)
- // and we do not want to have copy of it (for example it can cause delay dtor call)
- // so using move semantic here is mandatory.
- // Also we must reset captured shared pointer to session impl
- TResponse value = std::move(future.ExtractValue());
-
- const TStatus& status = GetStatus(value);
- // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect
- // since we have guarantee this request wasn't been started to execute.
-
- if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
- impl->MarkBroken();
- } else if (status.GetStatus() == EStatus::SESSION_BUSY) {
- impl->MarkBroken();
- } else if (status.GetStatus() == EStatus::BAD_SESSION) {
- impl->MarkBroken();
- } else if (IsSessionCloseRequested(status)) {
- impl->MarkAsClosing();
- } else {
- // NOTE: About GetState and lock
- // Simultanious call multiple requests on the same session make no sence, due to server limitation.
- // But user can perform this call, right now we do not protect session from this, it may cause
- // raise on session state if respoise is not success.
- // It should not be a problem - in case of this race we close session
- // or put it in to settler.
- if (updateTimeout && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
- impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TSession::TImpl::EState::S_ACTIVE);
- }
- }
- if (cb) {
- cb(value, *impl);
- }
- impl.reset();
- promise.SetValue(std::move(value));
- });
- return promise.GetFuture();
-}
-
-static ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber) {
- ui32 backoffSlots = 1 << std::min(retryNumber, settings.Ceiling_);
- TDuration maxDuration = settings.SlotDuration_ * backoffSlots;
-
- double uncertaintyRatio = std::max(std::min(settings.UncertainRatio_, 1.0), 0.0);
- double uncertaintyMultiplier = RandomNumber<double>() * uncertaintyRatio - uncertaintyRatio + 1.0;
-
- double durationMs = round(maxDuration.MilliSeconds() * uncertaintyMultiplier);
-
- return std::max(std::min(durationMs, (double)MAX_BACKOFF_DURATION_MS), 0.0);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public IMigratorClient {
-public:
- using TReadTableStreamProcessorPtr = TTablePartIterator::TReaderImpl::TStreamProcessorPtr;
- using TScanQueryProcessorPtr = TScanQueryPartIterator::TReaderImpl::TStreamProcessorPtr;
-
- TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings)
- : TClientImplCommon(std::move(connections), settings)
- , Settings_(settings)
- , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_)
- {
- if (!DbDriverState_->StatCollector.IsCollecting()) {
- return;
- }
-
- SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector());
- SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector());
- }
-
- ~TImpl() {
- if (Connections_->GetDrainOnDtors()) {
- Drain().Wait();
- }
- }
-
- bool LinkObjToEndpoint(const TEndpointKey& endpoint, TEndpointObj* obj, const void* tag) {
- return DbDriverState_->EndpointPool.LinkObjToEndpoint(endpoint, obj, tag);
- }
-
- void InitStopper() {
- std::weak_ptr<TTableClient::TImpl> weak = shared_from_this();
- auto cb = [weak]() mutable {
- auto strong = weak.lock();
- if (!strong) {
- auto promise = NThreading::NewPromise<void>();
- promise.SetException("no more client");
- return promise.GetFuture();
- }
- return strong->Drain();
- };
-
- DbDriverState_->AddCb(std::move(cb), TDbDriverState::ENotifyType::STOP);
- }
-
- NThreading::TFuture<void> Drain() {
- TVector<std::unique_ptr<TSession::TImpl>> sessions;
- // No realocations under lock
- sessions.reserve(Settings_.SessionPoolSettings_.MaxActiveSessions_);
- auto drainer = [&sessions](std::unique_ptr<TSession::TImpl>&& impl) mutable {
- sessions.push_back(std::move(impl));
- return true;
- };
- SessionPool_.Drain(drainer, true);
- TVector<TAsyncStatus> closeResults;
- for (auto& s : sessions) {
- if (s->GetId()) {
- closeResults.push_back(CloseInternal(s.get()));
- }
- }
- sessions.clear();
- return NThreading::WaitExceptionOrAll(closeResults);
- }
-
- NThreading::TFuture<void> Stop() {
- return Drain();
- }
-
- void ScheduleTask(const std::function<void()>& fn, TDuration timeout) {
- std::weak_ptr<TTableClient::TImpl> weak = shared_from_this();
- auto cbGuard = [weak, fn]() {
- auto strongClient = weak.lock();
- if (strongClient) {
- fn();
- }
- };
- Connections_->ScheduleOneTimeTask(std::move(cbGuard), timeout);
- }
-
- void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) {
- Connections_->ScheduleOneTimeTask(std::move(fn), timeout);
- }
-
- void AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn) {
- auto durationMs = CalcBackoffTime(settings, retryNumber);
- ScheduleTask(fn, TDuration::MilliSeconds(durationMs));
- }
-
- void StartPeriodicSessionPoolTask() {
-
- auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) {
-
- const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_;
- const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast();
-
- if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) {
- if (sessionsCount > sessionPoolSettings.MinPoolSize_) {
- return true;
- }
- }
-
- return false;
- };
-
- auto keepAliveCmd = [](TSession session) {
- Y_VERIFY(session.GetId());
-
- const auto sessionPoolSettings = session.Client_->Settings_.SessionPoolSettings_;
- const auto spentTime = session.SessionImpl_->GetTimeToTouchFast() - session.SessionImpl_->GetTimeInPastFast();
-
- const auto maxTimeToTouch = GetMaxTimeToTouch(session.Client_->Settings_.SessionPoolSettings_);
- const auto minTimeToTouch = GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_);
-
- auto calcTimeToNextTouch = [maxTimeToTouch, minTimeToTouch] (const TDuration spent) {
- auto timeToNextTouch = minTimeToTouch;
- if (maxTimeToTouch > spent) {
- auto t = maxTimeToTouch - spent;
- timeToNextTouch = Min(t, minTimeToTouch);
- }
- return timeToNextTouch;
- };
-
- if (spentTime >= sessionPoolSettings.KeepAliveIdleThreshold_) {
-
- // Handle of session status will be done inside InjectSessionStatusInterception routine.
- // We just need to reschedule time to next call because InjectSessionStatusInterception doesn't
- // update timeInPast for calls from internal keep alive routine
- session.KeepAlive(KeepAliveSettings)
- .Subscribe([spentTime, session, maxTimeToTouch, calcTimeToNextTouch](TAsyncKeepAliveResult asyncResult) {
- if (!asyncResult.GetValue().IsSuccess())
- return;
-
- if (spentTime >= maxTimeToTouch) {
- auto timeToNextTouch = calcTimeToNextTouch(spentTime);
- session.SessionImpl_->ScheduleTimeToTouchFast(timeToNextTouch, true);
- }
- });
- return;
- }
-
- auto timeToNextTouch = calcTimeToNextTouch(spentTime);
- session.SessionImpl_->ScheduleTimeToTouchFast(
- RandomizeThreshold(timeToNextTouch),
- spentTime >= maxTimeToTouch
- );
- };
-
- std::weak_ptr<TTableClient::TImpl> weak = shared_from_this();
- Connections_->AddPeriodicTask(
- SessionPool_.CreatePeriodicTask(
- weak,
- std::move(keepAliveCmd),
- std::move(deletePredicate)
- ), PERIODIC_ACTION_INTERVAL);
- }
-
- static ui64 ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client) {
- size_t max = 0;
- ui64 result = 0;
-
- auto cb = [&result, &max](ui64 nodeId, const IObjRegistryHandle& handle) {
- const auto sz = handle.Size();
- if (sz > max) {
- result = nodeId;
- max = sz;
- }
- };
-
- client->DbDriverState_->ForEachForeignEndpoint(cb, client.get());
-
- return result;
- }
-
- static std::pair<ui64, size_t> ScanLocation(std::shared_ptr<TTableClient::TImpl> client,
- std::unordered_map<ui64, size_t>& sessions, bool allNodes)
- {
- std::pair<ui64, size_t> result = {0, 0};
-
- auto cb = [&result, &sessions](ui64 nodeId, const IObjRegistryHandle& handle) {
- const auto sz = handle.Size();
- sessions.insert({nodeId, sz});
- if (sz > result.second) {
- result.first = nodeId;
- result.second = sz;
- }
- };
-
- if (allNodes) {
- client->DbDriverState_->ForEachEndpoint(cb, client.get());
- } else {
- client->DbDriverState_->ForEachLocalEndpoint(cb, client.get());
- }
-
- return result;
- }
-
- static NMath::TStats CalcCV(const std::unordered_map<ui64, size_t>& in) {
- TVector<size_t> t;
- t.reserve(in.size());
- std::transform(in.begin(), in.end(), std::back_inserter(t), [](const std::pair<ui64, size_t>& pair) {
- return pair.second;
- });
- return NMath::CalcCV(t);
- }
-
- void StartPeriodicHostScanTask() {
- std::weak_ptr<TTableClient::TImpl> weak = shared_from_this();
-
- // The future in completed when we have finished current migrate task
- // and ready to accept new one
- std::pair<ui64, size_t> winner = {0, 0};
-
- auto periodicCb = [weak, winner](NYql::TIssues&&, EStatus status) mutable -> bool {
-
- if (status != EStatus::SUCCESS) {
- return false;
- }
-
- auto strongClient = weak.lock();
- if (!strongClient) {
- return false;
- } else {
- TRequestMigrator& migrator = strongClient->RequestMigrator_;
-
- const auto balancingPolicy = strongClient->DbDriverState_->GetBalancingPolicy();
-
- // Try to find any host at foreign locations if prefer local dc
- const ui64 foreignHost = (balancingPolicy == EBalancingPolicy::UsePreferableLocation) ?
- ScanForeignLocations(strongClient) : 0;
-
- std::unordered_map<ui64, size_t> hostMap;
-
- winner = ScanLocation(strongClient, hostMap,
- balancingPolicy == EBalancingPolicy::UseAllNodes);
-
- bool forceMigrate = false;
-
- // There is host in foreign locations
- if (foreignHost) {
- // But no hosts at local
- if (hostMap.empty()) {
- Y_VERIFY(!winner.second);
- // Scan whole cluster - we have no local dc
- winner = ScanLocation(strongClient, hostMap, true);
- } else {
- // We have local and foreign hosts, so force migration to local one
- forceMigrate = true;
- // Just replace source
- winner.first = foreignHost;
- winner.second++;
- }
- }
-
- const auto minCv = strongClient->Settings_.MinSessionCV_;
-
- const auto stats = CalcCV(hostMap);
-
- strongClient->DbDriverState_->StatCollector.SetSessionCV(stats.Cv);
-
- // Just scan to update monitoring counter ^^
- // Balancing feature is disabled.
- if (!minCv)
- return true;
-
- if (hostMap.size() < 2)
- return true;
-
- // Migrate last session only if move from foreign to local
- if (!forceMigrate && winner.second < 2)
- return true;
-
- if (stats.Cv > minCv || forceMigrate) {
- migrator.SetHost(winner.first);
- } else {
- migrator.SetHost(0);
- }
- return true;
- }
- };
-
- Connections_->AddPeriodicTask(std::move(periodicCb), HOSTSCAN_PERIODIC_ACTION_INTERVAL);
- }
-
- TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings) {
- return SessionPool_.GetSession(shared_from_this(), settings);
- }
-
- i64 GetActiveSessionCount() const {
- return SessionPool_.GetActiveSessions();
- }
-
- i64 GetActiveSessionsLimit() const {
- return SessionPool_.GetActiveSessionsLimit();
- }
-
- i64 GetCurrentPoolSize() const {
- return SessionPool_.GetCurrentPoolSize();
- }
-
- TAsyncCreateSessionResult CreateSession(const TCreateSessionSettings& settings, bool standalone,
- TString preferedLocation = TString())
- {
- auto request = MakeOperationRequest<Ydb::Table::CreateSessionRequest>(settings);
-
- auto createSessionPromise = NewPromise<TCreateSessionResult>();
- auto self = shared_from_this();
- auto rpcSettings = TRpcRequestSettings::Make(settings);
- rpcSettings.Header.push_back({NYdb::YDB_CLIENT_CAPABILITIES, NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER});
-
- auto createSessionExtractor = [createSessionPromise, self, standalone]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- Ydb::Table::CreateSessionResult result;
- if (any) {
- any->UnpackTo(&result);
- }
- auto session = TSession(self, result.session_id(), status.Endpoint);
- if (status.Ok()) {
- if (standalone) {
- session.SessionImpl_->MarkStandalone();
- } else {
- session.SessionImpl_->MarkActive();
- }
- self->DbDriverState_->StatCollector.IncSessionsOnHost(status.Endpoint);
- } else {
- // We do not use SessionStatusInterception for CreateSession request
- session.SessionImpl_->MarkBroken();
- }
- TCreateSessionResult val(TStatus(std::move(status)), std::move(session));
- createSessionPromise.SetValue(std::move(val));
- };
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse>(
- std::move(request),
- createSessionExtractor,
- &Ydb::Table::V1::TableService::Stub::AsyncCreateSession,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- rpcSettings,
- TEndpointKey(preferedLocation, 0));
-
- std::weak_ptr<TDbDriverState> state = DbDriverState_;
-
- return createSessionPromise.GetFuture();
- }
-
- TAsyncKeepAliveResult KeepAlive(const TSession::TImpl* session, const TKeepAliveSettings& settings) {
- auto request = MakeOperationRequest<Ydb::Table::KeepAliveRequest>(settings);
- request.set_session_id(session->GetId());
-
- auto keepAliveResultPromise = NewPromise<TKeepAliveResult>();
- auto self = shared_from_this();
-
- auto keepAliveExtractor = [keepAliveResultPromise, self]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- Ydb::Table::KeepAliveResult result;
- ESessionStatus sessionStatus = ESessionStatus::Unspecified;
- if (any) {
- any->UnpackTo(&result);
-
- switch (result.session_status()) {
- case Ydb::Table::KeepAliveResult_SessionStatus_SESSION_STATUS_READY:
- sessionStatus = ESessionStatus::Ready;
- break;
- case Ydb::Table::KeepAliveResult_SessionStatus_SESSION_STATUS_BUSY:
- sessionStatus = ESessionStatus::Busy;
- break;
- default:
- sessionStatus = ESessionStatus::Unspecified;
- }
- }
- TKeepAliveResult val(TStatus(std::move(status)), sessionStatus);
- keepAliveResultPromise.SetValue(std::move(val));
- };
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::KeepAliveRequest, Ydb::Table::KeepAliveResponse>(
- std::move(request),
- keepAliveExtractor,
- &Ydb::Table::V1::TableService::Stub::AsyncKeepAlive,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session->GetEndpointKey());
-
- return keepAliveResultPromise.GetFuture();
- }
-
- TFuture<TStatus> CreateTable(Ydb::Table::CreateTableRequest&& request, const TCreateTableSettings& settings)
- {
- return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CreateTableRequest,Ydb::Table::CreateTableResponse>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncCreateTable,
- TRpcRequestSettings::Make(settings));
- }
-
- TFuture<TStatus> AlterTable(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings)
- {
- return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::AlterTableRequest, Ydb::Table::AlterTableResponse>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncAlterTable,
- TRpcRequestSettings::Make(settings));
- }
-
- TAsyncOperation AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings)
- {
- using Ydb::Table::V1::TableService;
- using Ydb::Table::AlterTableRequest;
- using Ydb::Table::AlterTableResponse;
- return RunOperation<TableService, AlterTableRequest, AlterTableResponse, TOperation>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncAlterTable,
- TRpcRequestSettings::Make(settings));
- }
-
- TFuture<TStatus> CopyTable(const TString& sessionId, const TString& src, const TString& dst,
- const TCopyTableSettings& settings)
- {
- auto request = MakeOperationRequest<Ydb::Table::CopyTableRequest>(settings);
- request.set_session_id(sessionId);
- request.set_source_path(src);
- request.set_destination_path(dst);
-
- return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CopyTableRequest, Ydb::Table::CopyTableResponse>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncCopyTable,
- TRpcRequestSettings::Make(settings));
- }
-
- TFuture<TStatus> CopyTables(Ydb::Table::CopyTablesRequest&& request, const TCopyTablesSettings& settings)
- {
- return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::CopyTablesRequest, Ydb::Table::CopyTablesResponse>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncCopyTables,
- TRpcRequestSettings::Make(settings));
- }
-
- TFuture<TStatus> RenameTables(Ydb::Table::RenameTablesRequest&& request, const TRenameTablesSettings& settings)
- {
- return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RenameTablesRequest, Ydb::Table::RenameTablesResponse>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncRenameTables,
- TRpcRequestSettings::Make(settings));
- }
-
- TFuture<TStatus> DropTable(const TString& sessionId, const TString& path, const TDropTableSettings& settings) {
- auto request = MakeOperationRequest<Ydb::Table::DropTableRequest>(settings);
- request.set_session_id(sessionId);
- request.set_path(path);
-
- return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DropTableRequest, Ydb::Table::DropTableResponse>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncDropTable,
- TRpcRequestSettings::Make(settings));
- }
-
- TAsyncDescribeTableResult DescribeTable(const TString& sessionId, const TString& path, const TDescribeTableSettings& settings) {
- auto request = MakeOperationRequest<Ydb::Table::DescribeTableRequest>(settings);
- request.set_session_id(sessionId);
- request.set_path(path);
- if (settings.WithKeyShardBoundary_) {
- request.set_include_shard_key_bounds(true);
- }
-
- if (settings.WithTableStatistics_) {
- request.set_include_table_stats(true);
- }
-
- if (settings.WithPartitionStatistics_) {
- request.set_include_partition_stats(true);
- }
-
- auto promise = NewPromise<TDescribeTableResult>();
-
- auto extractor = [promise, settings]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- Ydb::Table::DescribeTableResult result;
- if (any) {
- any->UnpackTo(&result);
- }
- TDescribeTableResult describeTableResult(TStatus(std::move(status)),
- std::move(result), settings);
- promise.SetValue(std::move(describeTableResult));
- };
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::DescribeTableRequest, Ydb::Table::DescribeTableResponse>(
- std::move(request),
- extractor,
- &Ydb::Table::V1::TableService::Stub::AsyncDescribeTable,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings));
-
- return promise.GetFuture();
- }
-
- template<typename TParamsType>
- TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const TString& query, const TTxControl& txControl,
- TParamsType params, const TExecDataQuerySettings& settings)
- {
- auto maybeQuery = session.SessionImpl_->GetQueryFromCache(query, Settings_.AllowRequestMigration_);
- if (maybeQuery) {
- TDataQuery dataQuery(session, query, maybeQuery->QueryId, maybeQuery->ParameterTypes);
- return ExecuteDataQuery(session, dataQuery, txControl, params, settings, true);
- }
-
- CacheMissCounter.Inc();
-
- return InjectSessionStatusInterception(session.SessionImpl_,
- ExecuteDataQueryInternal(session, query, txControl, params, settings, false),
- true, GetMinTimeToTouch(Settings_.SessionPoolSettings_));
- }
-
- template<typename TParamsType>
- TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const TDataQuery& dataQuery, const TTxControl& txControl,
- TParamsType params, const TExecDataQuerySettings& settings,
- bool fromCache)
- {
- TString queryKey = dataQuery.Impl_->GetTextHash();
- auto cb = [queryKey](const TDataQueryResult& result, TSession::TImpl& session) {
- if (result.GetStatus() == EStatus::NOT_FOUND) {
- session.InvalidateQueryInCache(queryKey);
- }
- };
-
- return InjectSessionStatusInterception<TDataQueryResult>(
- session.SessionImpl_,
- session.Client_->ExecuteDataQueryInternal(session, dataQuery, txControl, params, settings, fromCache),
- true,
- GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_),
- cb);
- }
-
- TAsyncPrepareQueryResult PrepareDataQuery(const TSession& session, const TString& query,
- const TPrepareDataQuerySettings& settings)
- {
- auto request = MakeOperationRequest<Ydb::Table::PrepareDataQueryRequest>(settings);
- request.set_session_id(session.GetId());
- request.set_yql_text(query);
-
- auto promise = NewPromise<TPrepareQueryResult>();
-
- //See ExecuteDataQueryInternal for explanation
- auto sessionPtr = new TSession(session);
- auto extractor = [promise, sessionPtr, query]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- TDataQuery dataQuery(*sessionPtr, query, "");
-
- if (any) {
- Ydb::Table::PrepareQueryResult result;
- any->UnpackTo(&result);
-
- if (status.Ok()) {
- dataQuery = TDataQuery(*sessionPtr, query, result.query_id(), result.parameters_types());
- sessionPtr->SessionImpl_->AddQueryToCache(dataQuery);
- }
- }
-
- TPrepareQueryResult prepareQueryResult(TStatus(std::move(status)),
- dataQuery, false);
- delete sessionPtr;
- promise.SetValue(std::move(prepareQueryResult));
- };
-
- CollectQuerySize(query, QuerySizeHistogram);
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::PrepareDataQueryRequest, Ydb::Table::PrepareDataQueryResponse>(
- std::move(request),
- extractor,
- &Ydb::Table::V1::TableService::Stub::AsyncPrepareDataQuery,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
-
- return promise.GetFuture();
- }
-
- TAsyncStatus ExecuteSchemeQuery(const TString& sessionId, const TString& query,
- const TExecSchemeQuerySettings& settings)
- {
- auto request = MakeOperationRequest<Ydb::Table::ExecuteSchemeQueryRequest>(settings);
- request.set_session_id(sessionId);
- request.set_yql_text(query);
-
- return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::ExecuteSchemeQueryRequest, Ydb::Table::ExecuteSchemeQueryResponse>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncExecuteSchemeQuery,
- TRpcRequestSettings::Make(settings));
- }
-
- TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings,
- const TBeginTxSettings& settings)
- {
- auto request = MakeOperationRequest<Ydb::Table::BeginTransactionRequest>(settings);
- request.set_session_id(session.GetId());
- SetTxSettings(txSettings, request.mutable_tx_settings());
-
- auto promise = NewPromise<TBeginTransactionResult>();
-
- auto extractor = [promise, session]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- TString txId;
- if (any) {
- Ydb::Table::BeginTransactionResult result;
- any->UnpackTo(&result);
- txId = result.tx_meta().id();
- }
-
- TBeginTransactionResult beginTxResult(TStatus(std::move(status)),
- TTransaction(session, txId));
- promise.SetValue(std::move(beginTxResult));
- };
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BeginTransactionRequest, Ydb::Table::BeginTransactionResponse>(
- std::move(request),
- extractor,
- &Ydb::Table::V1::TableService::Stub::AsyncBeginTransaction,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
-
- return promise.GetFuture();
- }
-
- TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx,
- const TCommitTxSettings& settings)
- {
- auto request = MakeOperationRequest<Ydb::Table::CommitTransactionRequest>(settings);
- request.set_session_id(session.GetId());
- request.set_tx_id(tx.GetId());
- request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
-
- auto promise = NewPromise<TCommitTransactionResult>();
-
- auto extractor = [promise]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- TMaybe<TQueryStats> queryStats;
- if (any) {
- Ydb::Table::CommitTransactionResult result;
- any->UnpackTo(&result);
-
- if (result.has_query_stats()) {
- queryStats = TQueryStats(result.query_stats());
- }
- }
-
- TCommitTransactionResult commitTxResult(TStatus(std::move(status)), queryStats);
- promise.SetValue(std::move(commitTxResult));
- };
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::CommitTransactionRequest, Ydb::Table::CommitTransactionResponse>(
- std::move(request),
- extractor,
- &Ydb::Table::V1::TableService::Stub::AsyncCommitTransaction,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
-
- return promise.GetFuture();
- }
-
- TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx,
- const TRollbackTxSettings& settings)
- {
- auto request = MakeOperationRequest<Ydb::Table::RollbackTransactionRequest>(settings);
- request.set_session_id(session.GetId());
- request.set_tx_id(tx.GetId());
-
- return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncRollbackTransaction,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
- }
-
- TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const TString& query,
- const TExplainDataQuerySettings& settings)
- {
- auto request = MakeOperationRequest<Ydb::Table::ExplainDataQueryRequest>(settings);
- request.set_session_id(session.GetId());
- request.set_yql_text(query);
-
- auto promise = NewPromise<TExplainQueryResult>();
-
- auto extractor = [promise]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- TString ast;
- TString plan;
- if (any) {
- Ydb::Table::ExplainQueryResult result;
- any->UnpackTo(&result);
- ast = result.query_ast();
- plan = result.query_plan();
- }
- TExplainQueryResult val(TStatus(std::move(status)),
- std::move(plan), std::move(ast));
- promise.SetValue(std::move(val));
- };
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::ExplainDataQueryRequest, Ydb::Table::ExplainDataQueryResponse>(
- std::move(request),
- extractor,
- &Ydb::Table::V1::TableService::Stub::AsyncExplainDataQuery,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
-
- return promise.GetFuture();
- }
-
- static void SetTypedValue(Ydb::TypedValue* protoValue, const TValue& value) {
- protoValue->mutable_type()->CopyFrom(TProtoAccessor::GetProto(value.GetType()));
- protoValue->mutable_value()->CopyFrom(TProtoAccessor::GetProto(value));
- }
-
- NThreading::TFuture<std::pair<TPlainStatus, TReadTableStreamProcessorPtr>> ReadTable(
- const TString& sessionId,
- const TString& path,
- const TReadTableSettings& settings)
- {
- auto request = MakeRequest<Ydb::Table::ReadTableRequest>();
- request.set_session_id(sessionId);
- request.set_path(path);
- request.set_ordered(settings.Ordered_);
- if (settings.RowLimit_) {
- request.set_row_limit(settings.RowLimit_.GetRef());
- }
- for (const auto& col : settings.Columns_) {
- request.add_columns(col);
- }
- if (settings.UseSnapshot_) {
- request.set_use_snapshot(
- settings.UseSnapshot_.GetRef()
- ? Ydb::FeatureFlag::ENABLED
- : Ydb::FeatureFlag::DISABLED);
- }
-
- if (settings.From_) {
- const auto& from = settings.From_.GetRef();
- if (from.IsInclusive()) {
- SetTypedValue(request.mutable_key_range()->mutable_greater_or_equal(), from.GetValue());
- } else {
- SetTypedValue(request.mutable_key_range()->mutable_greater(), from.GetValue());
- }
- }
-
- if (settings.To_) {
- const auto& to = settings.To_.GetRef();
- if (to.IsInclusive()) {
- SetTypedValue(request.mutable_key_range()->mutable_less_or_equal(), to.GetValue());
- } else {
- SetTypedValue(request.mutable_key_range()->mutable_less(), to.GetValue());
- }
- }
-
- auto promise = NewPromise<std::pair<TPlainStatus, TReadTableStreamProcessorPtr>>();
-
- Connections_->StartReadStream<Ydb::Table::V1::TableService, Ydb::Table::ReadTableRequest, Ydb::Table::ReadTableResponse>(
- std::move(request),
- [promise] (TPlainStatus status, TReadTableStreamProcessorPtr processor) mutable {
- promise.SetValue(std::make_pair(status, processor));
- },
- &Ydb::Table::V1::TableService::Stub::AsyncStreamReadTable,
- DbDriverState_,
- TRpcRequestSettings::Make(settings));
-
- return promise.GetFuture();
-
- }
-
- TAsyncReadRowsResult ReadRows(const TString& path, TValue&& keys, const TReadRowsSettings& settings) {
- auto request = MakeRequest<Ydb::Table::ReadRowsRequest>();
- request.set_path(path);
- auto* protoKeys = request.mutable_keys();
- *protoKeys->mutable_type() = TProtoAccessor::GetProto(keys.GetType());
- *protoKeys->mutable_value() = TProtoAccessor::GetProto(keys);
-
- auto promise = NewPromise<TReadRowsResult>();
-
- auto responseCb = [promise]
- (Ydb::Table::ReadRowsResponse* response, TPlainStatus status) mutable {
- Y_VERIFY(response);
- TResultSet resultSet = TResultSet(response->result_set());
- TReadRowsResult val(TStatus(std::move(status)), std::move(resultSet));
- promise.SetValue(std::move(val));
- };
-
- Connections_->Run<Ydb::Table::V1::TableService, Ydb::Table::ReadRowsRequest, Ydb::Table::ReadRowsResponse>(
- std::move(request),
- responseCb,
- &Ydb::Table::V1::TableService::Stub::AsyncReadRows,
- DbDriverState_,
- TRpcRequestSettings::Make(settings), // requestSettings
- TEndpointKey() // preferredEndpoint
- );
-
- return promise.GetFuture();
- }
-
- TAsyncStatus Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings) {
- auto request = MakeOperationRequest<Ydb::Table::DeleteSessionRequest>(settings);
- request.set_session_id(sessionImpl->GetId());
- return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse>(
- std::move(request),
- &Ydb::Table::V1::TableService::Stub::AsyncDeleteSession,
- TRpcRequestSettings::Make(settings),
- sessionImpl->GetEndpointKey());
- }
-
- TAsyncStatus CloseInternal(const TSession::TImpl* sessionImpl) {
- static const auto internalCloseSessionSettings = TCloseSessionSettings()
- .ClientTimeout(TDuration::Seconds(2));
-
- auto driver = Connections_;
- return Close(sessionImpl, internalCloseSessionSettings)
- .Apply([driver{std::move(driver)}](TAsyncStatus status) mutable
- {
- driver.reset();
- return status;
- });
- }
-
- bool ReturnSession(TSession::TImpl* sessionImpl) {
- Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE ||
- sessionImpl->GetState() == TSession::TImpl::S_IDLE);
-
- if (RequestMigrator_.DoCheckAndMigrate(sessionImpl)) {
- SessionRemovedDueBalancing.Inc();
- return false;
- }
-
- bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter();
- // Also removes NeedUpdateActiveCounter flag
- sessionImpl->MarkIdle();
- sessionImpl->SetTimeInterval(TDuration::Zero());
- if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) {
- sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter);
- return false;
- }
- return true;
- }
-
- void DeleteSession(TSession::TImpl* sessionImpl) {
- if (sessionImpl->NeedUpdateActiveCounter()) {
- SessionPool_.DecrementActiveCounter();
- }
-
- if (sessionImpl->GetId()) {
- CloseInternal(sessionImpl);
- DbDriverState_->StatCollector.DecSessionsOnHost(sessionImpl->GetEndpoint());
- }
-
- delete sessionImpl;
- }
-
- ui32 GetSessionRetryLimit() const {
- return Settings_.SessionPoolSettings_.RetryLimit_;
- }
-
- void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector) {
- CacheMissCounter.Set(collector.CacheMiss);
- QuerySizeHistogram.Set(collector.QuerySize);
- ParamsSizeHistogram.Set(collector.ParamsSize);
- RetryOperationStatCollector = collector.RetryOperationStatCollector;
- SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing);
- RequestMigrated.Set(collector.RequestMigrated);
- }
-
- TAsyncBulkUpsertResult BulkUpsert(const TString& table, TValue&& rows, const TBulkUpsertSettings& settings) {
- auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings);
- request.set_table(table);
- *request.mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType());
- *request.mutable_rows()->mutable_value() = std::move(rows.GetProto());
-
- auto promise = NewPromise<TBulkUpsertResult>();
-
- auto extractor = [promise]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- Y_UNUSED(any);
- TBulkUpsertResult val(TStatus(std::move(status)));
- promise.SetValue(std::move(val));
- };
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
- std::move(request),
- extractor,
- &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings));
-
- return promise.GetFuture();
- }
-
- TAsyncBulkUpsertResult BulkUpsert(const TString& table, EDataFormat format,
- const TString& data, const TString& schema, const TBulkUpsertSettings& settings)
- {
- auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings);
- request.set_table(table);
- if (format == EDataFormat::ApacheArrow) {
- request.mutable_arrow_batch_settings()->set_schema(schema);
- } else if (format == EDataFormat::CSV) {
- auto* csv_settings = request.mutable_csv_settings();
- const auto& format_settings = settings.FormatSettings_;
- if (!format_settings.empty()) {
- bool ok = csv_settings->ParseFromString(format_settings);
- if (!ok) {
- return {};
- }
- }
- }
- request.set_data(data);
-
- auto promise = NewPromise<TBulkUpsertResult>();
-
- auto extractor = [promise]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- Y_UNUSED(any);
- TBulkUpsertResult val(TStatus(std::move(status)));
- promise.SetValue(std::move(val));
- };
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
- std::move(request),
- extractor,
- &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings));
-
- return promise.GetFuture();
- }
-
- TFuture<std::pair<TPlainStatus, TScanQueryProcessorPtr>> StreamExecuteScanQueryInternal(const TString& query,
- const ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
- const TStreamExecScanQuerySettings& settings)
- {
- auto request = MakeRequest<Ydb::Table::ExecuteScanQueryRequest>();
- request.mutable_query()->set_yql_text(query);
-
- if (params) {
- *request.mutable_parameters() = *params;
- }
-
- if (settings.Explain_) {
- request.set_mode(Ydb::Table::ExecuteScanQueryRequest::MODE_EXPLAIN);
- } else {
- request.set_mode(Ydb::Table::ExecuteScanQueryRequest::MODE_EXEC);
- }
-
- request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
-
- auto promise = NewPromise<std::pair<TPlainStatus, TScanQueryProcessorPtr>>();
-
- Connections_->StartReadStream<
- Ydb::Table::V1::TableService,
- Ydb::Table::ExecuteScanQueryRequest,
- Ydb::Table::ExecuteScanQueryPartialResponse>
- (
- std::move(request),
- [promise] (TPlainStatus status, TScanQueryProcessorPtr processor) mutable {
- promise.SetValue(std::make_pair(status, processor));
- },
- &Ydb::Table::V1::TableService::Stub::AsyncStreamExecuteScanQuery,
- DbDriverState_,
- TRpcRequestSettings::Make(settings)
- );
-
- return promise.GetFuture();
- }
-
- TAsyncScanQueryPartIterator StreamExecuteScanQuery(const TString& query,
- const ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
- const TStreamExecScanQuerySettings& settings)
- {
- auto promise = NewPromise<TScanQueryPartIterator>();
-
- auto iteratorCallback = [promise](TFuture<std::pair<TPlainStatus,
- TTableClient::TImpl::TScanQueryProcessorPtr>> future) mutable
- {
- Y_ASSERT(future.HasValue());
- auto pair = future.ExtractValue();
- promise.SetValue(TScanQueryPartIterator(
- pair.second
- ? std::make_shared<TScanQueryPartIterator::TReaderImpl>(pair.second, pair.first.Endpoint)
- : nullptr,
- std::move(pair.first))
- );
- };
-
- StreamExecuteScanQueryInternal(query, params, settings).Subscribe(iteratorCallback);
- return promise.GetFuture();
- }
-
- static void CloseAndDeleteSession(
- std::unique_ptr<TSession::TImpl>&& impl,
- std::shared_ptr<TTableClient::TImpl> client);
-public:
- TClientSettings Settings_;
-
-private:
- static void SetParams(
- ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
- Ydb::Table::ExecuteDataQueryRequest* request)
- {
- if (params) {
- request->mutable_parameters()->swap(*params);
- }
- }
-
- static void SetParams(
- const ::google::protobuf::Map<TString, Ydb::TypedValue>& params,
- Ydb::Table::ExecuteDataQueryRequest* request)
- {
- *request->mutable_parameters() = params;
- }
-
- static void CollectParams(
- ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
- NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram)
- {
-
- if (params && histgoram.IsCollecting()) {
- size_t size = 0;
- for (auto& keyvalue: *params) {
- size += keyvalue.second.ByteSizeLong();
- }
- histgoram.Record(size);
- }
- }
-
- static void CollectParams(
- const ::google::protobuf::Map<TString, Ydb::TypedValue>& params,
- NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram)
- {
-
- if (histgoram.IsCollecting()) {
- size_t size = 0;
- for (auto& keyvalue: params) {
- size += keyvalue.second.ByteSizeLong();
- }
- histgoram.Record(size);
- }
- }
-
- static void CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>& querySizeHistogram) {
- if (querySizeHistogram.IsCollecting()) {
- querySizeHistogram.Record(query.size());
- }
- }
-
- static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&) {}
-
- template <typename TQueryType, typename TParamsType>
- TAsyncDataQueryResult ExecuteDataQueryInternal(const TSession& session, const TQueryType& query,
- const TTxControl& txControl, TParamsType params,
- const TExecDataQuerySettings& settings, bool fromCache)
- {
- auto request = MakeOperationRequest<Ydb::Table::ExecuteDataQueryRequest>(settings);
- request.set_session_id(session.GetId());
- auto txControlProto = request.mutable_tx_control();
- txControlProto->set_commit_tx(txControl.CommitTx_);
- if (txControl.TxId_) {
- txControlProto->set_tx_id(*txControl.TxId_);
- } else {
- SetTxSettings(txControl.BeginTx_, txControlProto->mutable_begin_tx());
- }
-
- request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
-
- SetQuery(query, request.mutable_query());
- CollectQuerySize(query, QuerySizeHistogram);
-
- SetParams(params, &request);
- CollectParams(params, ParamsSizeHistogram);
-
- SetQueryCachePolicy(query, settings, request.mutable_query_cache_policy());
-
- auto promise = NewPromise<TDataQueryResult>();
- bool keepInCache = settings.KeepInQueryCache_ && settings.KeepInQueryCache_.GetRef();
-
- // We don't want to delay call of TSession dtor, so we can't capture it by copy
- // otherwise we break session pool and other clients logic.
- // Same problem with TDataQuery and TTransaction
- //
- // The fast solution is:
- // - create copy of TSession out of lambda
- // - capture pointer
- // - call free just before SetValue call
- auto sessionPtr = new TSession(session);
- auto extractor = [promise, sessionPtr, query, fromCache, keepInCache]
- (google::protobuf::Any* any, TPlainStatus status) mutable {
- TVector<TResultSet> res;
- TMaybe<TTransaction> tx;
- TMaybe<TDataQuery> dataQuery;
- TMaybe<TQueryStats> queryStats;
-
- auto queryText = GetQueryText(query);
- if (any) {
- Ydb::Table::ExecuteQueryResult result;
- any->UnpackTo(&result);
-
- for (size_t i = 0; i < result.result_setsSize(); i++) {
- res.push_back(TResultSet(*result.mutable_result_sets(i)));
- }
-
- if (result.has_tx_meta()) {
- tx = TTransaction(*sessionPtr, result.tx_meta().id());
- }
-
- if (result.has_query_meta()) {
- if (queryText) {
- auto& query_meta = result.query_meta();
- dataQuery = TDataQuery(*sessionPtr, *queryText, query_meta.id(), query_meta.parameters_types());
- }
- }
-
- if (result.has_query_stats()) {
- queryStats = TQueryStats(result.query_stats());
- }
- }
-
- if (keepInCache && dataQuery && queryText) {
- sessionPtr->SessionImpl_->AddQueryToCache(*dataQuery);
- }
-
- TDataQueryResult dataQueryResult(TStatus(std::move(status)),
- std::move(res), tx, dataQuery, fromCache, queryStats);
-
- delete sessionPtr;
- tx.Clear();
- dataQuery.Clear();
- promise.SetValue(std::move(dataQueryResult));
- };
-
- Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse>(
- std::move(request),
- extractor,
- &Ydb::Table::V1::TableService::Stub::AsyncExecuteDataQuery,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
-
- return promise.GetFuture();
- }
-
- static void SetTxSettings(const TTxSettings& txSettings, Ydb::Table::TransactionSettings* proto)
- {
- switch (txSettings.Mode_) {
- case TTxSettings::TS_SERIALIZABLE_RW:
- proto->mutable_serializable_read_write();
- break;
- case TTxSettings::TS_ONLINE_RO:
- proto->mutable_online_read_only()->set_allow_inconsistent_reads(
- txSettings.OnlineSettings_.AllowInconsistentReads_);
- break;
- case TTxSettings::TS_STALE_RO:
- proto->mutable_stale_read_only();
- break;
- case TTxSettings::TS_SNAPSHOT_RO:
- proto->mutable_snapshot_read_only();
- break;
- default:
- throw TContractViolation("Unexpected transaction mode.");
- }
- }
-
- static void SetQuery(const TString& queryText, Ydb::Table::Query* query) {
- query->set_yql_text(queryText);
- }
-
- static void SetQuery(const TDataQuery& queryData, Ydb::Table::Query* query) {
- query->set_id(queryData.GetId());
- }
-
- static void SetQueryCachePolicy(const TString&, const TExecDataQuerySettings& settings,
- Ydb::Table::QueryCachePolicy* queryCachePolicy)
- {
- queryCachePolicy->set_keep_in_cache(settings.KeepInQueryCache_ ? settings.KeepInQueryCache_.GetRef() : false);
- }
-
- static void SetQueryCachePolicy(const TDataQuery&, const TExecDataQuerySettings& settings,
- Ydb::Table::QueryCachePolicy* queryCachePolicy) {
- queryCachePolicy->set_keep_in_cache(settings.KeepInQueryCache_ ? settings.KeepInQueryCache_.GetRef() : true);
- }
-
- static TMaybe<TString> GetQueryText(const TString& queryText) {
- return queryText;
- }
-
- static TMaybe<TString> GetQueryText(const TDataQuery& queryData) {
- return queryData.GetText();
- }
-
-public:
- NSdkStats::TAtomicCounter<::NMonitoring::TRate> CacheMissCounter;
- NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector;
- NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram;
- NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram;
- NSdkStats::TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing;
- NSdkStats::TAtomicCounter<::NMonitoring::TRate> RequestMigrated;
-
-private:
- TSessionPoolImpl SessionPool_;
- TRequestMigrator RequestMigrator_;
- static const TKeepAliveSettings KeepAliveSettings;
-};
-
-const TKeepAliveSettings TTableClient::TImpl::KeepAliveSettings = TKeepAliveSettings().ClientTimeout(KEEP_ALIVE_CLIENT_TIMEOUT);
-
-TSessionPoolImpl::TSessionPoolImpl(ui32 maxActiveSessions)
- : Closed_(false)
- , ActiveSessions_(0)
- , MaxActiveSessions_(maxActiveSessions)
-{}
-
-void TTableClient::TImpl::CloseAndDeleteSession(std::unique_ptr<TSession::TImpl>&& impl,
- std::shared_ptr<TTableClient::TImpl> client) {
- std::shared_ptr<TSession::TImpl> deleteSession(
- impl.release(),
- TSession::TImpl::GetSmartDeleter(client));
-
- deleteSession->MarkBroken();
-}
-
-void TSessionPoolImpl::CreateFakeSession(
- NThreading::TPromise<TCreateSessionResult>& promise,
- std::shared_ptr<TTableClient::TImpl> client)
-{
- TSession session(client, "", "");
- // Mark broken to prevent returning to session pool
- session.SessionImpl_->MarkBroken();
- TCreateSessionResult val(
- TStatus(
- TPlainStatus(
- EStatus::CLIENT_RESOURCE_EXHAUSTED,
- "Active sessions limit exceeded"
- )
- ),
- std::move(session)
- );
-
- client->ScheduleTaskUnsafe([promise, val{std::move(val)}]() mutable {
- promise.SetValue(std::move(val));
- }, TDuration());
-}
-
-TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
- std::shared_ptr<TTableClient::TImpl> client,
- const TCreateSessionSettings& settings)
-{
- auto createSessionPromise = NewPromise<TCreateSessionResult>();
- std::unique_ptr<TSession::TImpl> sessionImpl;
- bool needUpdateActiveSessionCounter = false;
- bool returnFakeSession = false;
- {
- std::lock_guard guard(Mtx_);
- if (MaxActiveSessions_) {
- if (ActiveSessions_ < MaxActiveSessions_) {
- ActiveSessions_++;
- needUpdateActiveSessionCounter = true;
- } else {
- returnFakeSession = true;
- }
- } else {
- ActiveSessions_++;
- needUpdateActiveSessionCounter = true;
- }
- if (!Sessions_.empty()) {
- auto it = std::prev(Sessions_.end());
- sessionImpl = std::move(it->second);
- Sessions_.erase(it);
- }
- UpdateStats();
- }
- if (returnFakeSession) {
- FakeSessionsCounter_.Inc();
- CreateFakeSession(createSessionPromise, client);
- return createSessionPromise.GetFuture();
- } else if (sessionImpl) {
- Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE);
- Y_VERIFY(!sessionImpl->GetTimeInterval());
- Y_VERIFY(needUpdateActiveSessionCounter);
- sessionImpl->MarkActive();
- sessionImpl->SetNeedUpdateActiveCounter(true);
-
- TCreateSessionResult val(TStatus(TPlainStatus()),
- TSession(client, std::shared_ptr<TSession::TImpl>(
- sessionImpl.release(),
- TSession::TImpl::GetSmartDeleter(client))));
-
- client->ScheduleTaskUnsafe([createSessionPromise, val{std::move(val)}]() mutable {
- createSessionPromise.SetValue(std::move(val));
- }, TDuration());
-
- return createSessionPromise.GetFuture();
- } else {
- const auto& sessionResult = client->CreateSession(settings, false);
- sessionResult.Subscribe(TSession::TImpl::GetSessionInspector(createSessionPromise, client, settings, 0, needUpdateActiveSessionCounter));
- return createSessionPromise.GetFuture();
- }
-}
-
-bool TSessionPoolImpl::DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, ui64 nodeId) {
- std::unique_ptr<TSession::TImpl> sessionImpl;
- {
- std::lock_guard guard(Mtx_);
- for (auto it = Sessions_.begin(); it != Sessions_.end(); it++) {
- if (it->second->GetEndpointKey().GetNodeId() == nodeId) {
- sessionImpl = std::move(it->second);
- Sessions_.erase(it);
- break;
- }
- }
- }
- if (!sessionImpl)
- return false;
-
- auto deleteFn = TSession::TImpl::GetSmartDeleter(client);
- deleteFn(sessionImpl.release());
- return true;
-}
-
-bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active) {
- {
- std::lock_guard guard(Mtx_);
- if (Closed_)
- return false;
- Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl));
- if (active) {
- Y_VERIFY(ActiveSessions_);
- ActiveSessions_--;
- impl->SetNeedUpdateActiveCounter(false);
- }
- UpdateStats();
- }
- return true;
-}
-
-void TSessionPoolImpl::DecrementActiveCounter() {
- std::lock_guard guard(Mtx_);
- Y_VERIFY(ActiveSessions_);
- ActiveSessions_--;
- UpdateStats();
-}
-
-void TSessionPoolImpl::Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close) {
- std::lock_guard guard(Mtx_);
- Closed_ = close;
- for (auto it = Sessions_.begin(); it != Sessions_.end();) {
- const bool cont = cb(std::move(it->second));
- it = Sessions_.erase(it);
- if (!cont)
- break;
- }
- UpdateStats();
-}
-
-TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient,
- TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate)
-{
- auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) {
- if (status != EStatus::SUCCESS) {
- return false;
- }
-
- auto strongClient = weakClient.lock();
- if (!strongClient) {
- // No more clients alive - no need to run periodic,
- // moreover it is unsafe to touch this ptr!
- return false;
- } else {
- auto keepAliveBatchSize = PERIODIC_ACTION_BATCH_SIZE;
- TVector<std::unique_ptr<TSession::TImpl>> sessionsToTouch;
- sessionsToTouch.reserve(keepAliveBatchSize);
- TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete;
- sessionsToDelete.reserve(keepAliveBatchSize);
- auto now = TInstant::Now();
- {
- std::lock_guard guard(Mtx_);
- auto& sessions = Sessions_;
-
- auto it = sessions.begin();
- while (it != sessions.end() && keepAliveBatchSize--) {
- if (now < it->second->GetTimeToTouchFast())
- break;
-
- if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) {
- sessionsToDelete.emplace_back(std::move(it->second));
- } else {
- sessionsToTouch.emplace_back(std::move(it->second));
- }
- sessions.erase(it++);
- }
- UpdateStats();
- }
-
- for (auto& sessionImpl : sessionsToTouch) {
- if (sessionImpl) {
- Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE);
- TSession session(strongClient, std::shared_ptr<TSession::TImpl>(
- sessionImpl.release(),
- TSession::TImpl::GetSmartDeleter(strongClient)));
- cmd(session);
- }
- }
-
- for (auto& sessionImpl : sessionsToDelete) {
- if (sessionImpl) {
- Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE);
- TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient);
- }
- }
- }
-
- return true;
- };
- return periodicCb;
-}
-
-i64 TSessionPoolImpl::GetActiveSessions() const {
- std::lock_guard guard(Mtx_);
- return ActiveSessions_;
-}
-
-i64 TSessionPoolImpl::GetActiveSessionsLimit() const {
- return MaxActiveSessions_;
-}
-
-i64 TSessionPoolImpl::GetCurrentPoolSize() const {
- std::lock_guard guard(Mtx_);
- return Sessions_.size();
-}
static bool IsSessionStatusRetriable(const TCreateSessionResult& res) {
switch (res.GetStatus()) {
@@ -3007,17 +1295,6 @@ TSessionInspectorFn TSession::TImpl::GetSessionInspector(
};
}
-void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) {
- ActiveSessionsCounter_.Set(statCollector.ActiveSessions);
- InPoolSessionsCounter_.Set(statCollector.InPoolSessions);
- FakeSessionsCounter_.Set(statCollector.FakeSessions);
-}
-
-void TSessionPoolImpl::UpdateStats() {
- ActiveSessionsCounter_.Apply(ActiveSessions_);
- InPoolSessionsCounter_.Apply(Sessions_.size());
-}
-
TTableClient::TTableClient(const TDriver& driver, const TClientSettings& settings)
: Impl_(new TImpl(CreateInternalInterface(driver), settings)) {
Impl_->StartPeriodicSessionPoolTask();
@@ -3504,13 +1781,14 @@ static void ConvertCreateTableSettingsToProto(const TCreateTableSettings& settin
////////////////////////////////////////////////////////////////////////////////
-TSession::TSession(std::shared_ptr<TTableClient::TImpl> client, const TString& sessionId, const TString& endpointId)
+TSession::TSession(std::shared_ptr<TTableClient::TImpl> client, const TString& sessionId, const TString& endpointId, bool isOwnedBySessionPool)
: Client_(client)
, SessionImpl_(new TSession::TImpl(
sessionId,
endpointId,
client->Settings_.UseQueryCache_,
- client->Settings_.QueryCacheSize_),
+ client->Settings_.QueryCacheSize_,
+ isOwnedBySessionPool),
TSession::TImpl::GetSmartDeleter(client))
{
if (endpointId) {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index b402fa1e15..d44500349e 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -849,7 +849,6 @@ class TDescribeTableResult;
class TBeginTransactionResult;
class TCommitTransactionResult;
class TKeepAliveResult;
-class TSessionPoolImpl;
class TBulkUpsertResult;
class TReadRowsResult;
class TScanQueryPartIterator;
@@ -978,6 +977,7 @@ struct TStreamExecScanQuerySettings : public TRequestSettings<TStreamExecScanQue
};
class TSession;
+class TSessionPool;
struct TRetryState;
enum class EDataFormat {
@@ -988,7 +988,7 @@ enum class EDataFormat {
class TTableClient {
friend class TSession;
friend class TTransaction;
- friend class TSessionPoolImpl;
+ friend class TSessionPool;
friend class TRetryOperationContext;
public:
@@ -1579,7 +1579,7 @@ class TSession {
friend class TTableClient;
friend class TDataQuery;
friend class TTransaction;
- friend class TSessionPoolImpl;
+ friend class TSessionPool;
public:
//! The following methods perform corresponding calls.
@@ -1647,7 +1647,7 @@ public:
class TImpl;
private:
- TSession(std::shared_ptr<TTableClient::TImpl> client, const TString& sessionId, const TString& endpointId);
+ TSession(std::shared_ptr<TTableClient::TImpl> client, const TString& sessionId, const TString& endpointId, bool isOwnedBySessionPool);
TSession(std::shared_ptr<TTableClient::TImpl> client, std::shared_ptr<TSession::TImpl> SessionImpl_);
std::shared_ptr<TTableClient::TImpl> Client_;
diff --git a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt
index 76c589603f..2077daf05e 100644
--- a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt
@@ -9,6 +9,7 @@
find_package(OpenSSL REQUIRED)
add_subdirectory(index_ut)
add_subdirectory(sdk_credprovider_ut)
+add_subdirectory(sdk_sessions_pool_ut)
add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
diff --git a/ydb/services/ydb/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/CMakeLists.linux-aarch64.txt
index e481961748..3ab9453018 100644
--- a/ydb/services/ydb/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/ydb/CMakeLists.linux-aarch64.txt
@@ -9,6 +9,7 @@
find_package(OpenSSL REQUIRED)
add_subdirectory(index_ut)
add_subdirectory(sdk_credprovider_ut)
+add_subdirectory(sdk_sessions_pool_ut)
add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
diff --git a/ydb/services/ydb/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/CMakeLists.linux-x86_64.txt
index e481961748..3ab9453018 100644
--- a/ydb/services/ydb/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/ydb/CMakeLists.linux-x86_64.txt
@@ -9,6 +9,7 @@
find_package(OpenSSL REQUIRED)
add_subdirectory(index_ut)
add_subdirectory(sdk_credprovider_ut)
+add_subdirectory(sdk_sessions_pool_ut)
add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
diff --git a/ydb/services/ydb/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/CMakeLists.windows-x86_64.txt
index 76c589603f..2077daf05e 100644
--- a/ydb/services/ydb/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/ydb/CMakeLists.windows-x86_64.txt
@@ -9,6 +9,7 @@
find_package(OpenSSL REQUIRED)
add_subdirectory(index_ut)
add_subdirectory(sdk_credprovider_ut)
+add_subdirectory(sdk_sessions_pool_ut)
add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..76c1e3815f
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,81 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ydb-sdk_sessions_pool_ut)
+target_compile_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb
+)
+target_link_libraries(ydb-services-ydb-sdk_sessions_pool_ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-ydb
+ cpp-grpc-client
+ core-testlib-default
+ ydb-core-testlib
+ cpp-client-ydb_table
+)
+target_link_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-ydb-sdk_sessions_pool_ut
+ TEST_TARGET
+ ydb-services-ydb-sdk_sessions_pool_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+target_allocator(ydb-services-ydb-sdk_sessions_pool_ut
+ system_allocator
+)
+vcs_info(ydb-services-ydb-sdk_sessions_pool_ut)
diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..307872510d
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,84 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ydb-sdk_sessions_pool_ut)
+target_compile_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb
+)
+target_link_libraries(ydb-services-ydb-sdk_sessions_pool_ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ ydb-services-ydb
+ cpp-grpc-client
+ core-testlib-default
+ ydb-core-testlib
+ cpp-client-ydb_table
+)
+target_link_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-ydb-sdk_sessions_pool_ut
+ TEST_TARGET
+ ydb-services-ydb-sdk_sessions_pool_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+target_allocator(ydb-services-ydb-sdk_sessions_pool_ut
+ cpp-malloc-jemalloc
+)
+vcs_info(ydb-services-ydb-sdk_sessions_pool_ut)
diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..d3a3ab6950
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,86 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ydb-sdk_sessions_pool_ut)
+target_compile_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb
+)
+target_link_libraries(ydb-services-ydb-sdk_sessions_pool_ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-ydb
+ cpp-grpc-client
+ core-testlib-default
+ ydb-core-testlib
+ cpp-client-ydb_table
+)
+target_link_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-ydb-sdk_sessions_pool_ut
+ TEST_TARGET
+ ydb-services-ydb-sdk_sessions_pool_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+target_allocator(ydb-services-ydb-sdk_sessions_pool_ut
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(ydb-services-ydb-sdk_sessions_pool_ut)
diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..60cb26ee02
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_pool_ut/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,74 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ydb-sdk_sessions_pool_ut)
+target_compile_options(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb
+)
+target_link_libraries(ydb-services-ydb-sdk_sessions_pool_ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-ydb
+ cpp-grpc-client
+ core-testlib-default
+ ydb-core-testlib
+ cpp-client-ydb_table
+)
+target_sources(ydb-services-ydb-sdk_sessions_pool_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-ydb-sdk_sessions_pool_ut
+ TEST_TARGET
+ ydb-services-ydb-sdk_sessions_pool_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_pool_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+target_allocator(ydb-services-ydb-sdk_sessions_pool_ut
+ system_allocator
+)
+vcs_info(ydb-services-ydb-sdk_sessions_pool_ut)
diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp b/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp
new file mode 100644
index 0000000000..97ae2d6aa3
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp
@@ -0,0 +1,448 @@
+#include "ydb_common_ut.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
+
+#include <util/thread/pool.h>
+
+#include <random>
+#include <thread>
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+class TDefaultTestSetup {
+public:
+ TDefaultTestSetup(ui32 maxActiveSessions)
+ : Driver_(NYdb::TDriver(
+ TDriverConfig().SetEndpoint(
+ TStringBuilder() << "localhost:" << Server_.GetPort()
+ )
+ ))
+ , Client_(
+ Driver_,
+ TClientSettings().SessionPoolSettings(
+ TSessionPoolSettings()
+ .MaxActiveSessions(maxActiveSessions)
+ .KeepAliveIdleThreshold(TDuration::MilliSeconds(10))
+ .CloseIdleThreshold(TDuration::MilliSeconds(10))
+ )
+ )
+ {
+ }
+
+ ~TDefaultTestSetup() {
+ Driver_.Stop(true);
+ }
+
+ NYdb::NTable::TTableClient& GetClient() {
+ return Client_;
+ }
+
+private:
+ TKikimrWithGrpcAndRootSchema Server_;
+ NYdb::TDriver Driver_;
+ NYdb::NTable::TTableClient Client_;
+};
+
+
+enum class EAction: ui8 {
+ CreateFuture,
+ ExtractValue,
+ Return
+};
+using TPlan = TVector<std::pair<EAction, ui32>>;
+
+
+void CheckPlan(TPlan plan) {
+ THashMap<ui32, EAction> sessions;
+ for (const auto& [action, sessionId]: plan) {
+ if (action == EAction::CreateFuture) {
+ UNIT_ASSERT(!sessions.contains(sessionId));
+ } else {
+ UNIT_ASSERT(sessions.contains(sessionId));
+ switch (sessions.at(sessionId)) {
+ case EAction::CreateFuture: {
+ UNIT_ASSERT(action == EAction::ExtractValue);
+ break;
+ }
+ case EAction::ExtractValue: {
+ UNIT_ASSERT(action == EAction::Return);
+ break;
+ }
+ default: {
+ UNIT_ASSERT(false);
+ }
+ }
+ }
+ sessions[sessionId] = action;
+ }
+}
+
+void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) {
+ THashMap<ui32, NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
+ THashMap<ui32, NYdb::NTable::TCreateSessionResult> sessions;
+
+ ui32 requestedSessions = 0;
+
+ for (const auto& [action, sessionId]: plan) {
+ switch (action) {
+ case EAction::CreateFuture: {
+ sessionFutures.emplace(sessionId, client.GetSession());
+ ++requestedSessions;
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ if (requestedSessions > client.GetActiveSessionsLimit()) {
+ UNIT_ASSERT(client.GetActiveSessionCount() == client.GetActiveSessionsLimit());
+ }
+ UNIT_ASSERT(!sessionFutures.at(sessionId).HasValue());
+ break;
+ }
+ case EAction::ExtractValue: {
+ auto it = sessionFutures.find(sessionId);
+ auto session = it->second.ExtractValueSync();
+ sessionFutures.erase(it);
+ sessions.emplace(sessionId, std::move(session));
+ break;
+ }
+ case EAction::Return: {
+ sessions.erase(sessionId);
+ --requestedSessions;
+ break;
+ }
+ }
+ UNIT_ASSERT(client.GetActiveSessionCount() <= client.GetActiveSessionsLimit());
+ UNIT_ASSERT(client.GetActiveSessionCount() >= static_cast<i64>(sessions.size()));
+ UNIT_ASSERT(client.GetActiveSessionCount() <= static_cast<i64>(sessions.size() + sessionFutures.size()));
+ }
+}
+
+int GetRand(std::mt19937& rng, int min, int max) {
+ std::uniform_int_distribution<std::mt19937::result_type> dist(min, max);
+ return dist(rng);
+}
+
+
+TPlan GenerateRandomPlan(ui32 numSessions) {
+ TPlan plan;
+ std::random_device dev;
+ std::mt19937 rng(dev());
+
+ for (ui32 i = 0; i < numSessions; ++i) {
+ std::uniform_int_distribution<std::mt19937::result_type> dist(0, plan.size());
+ ui32 prevPos = 0;
+ for (EAction action: {EAction::CreateFuture, EAction::ExtractValue, EAction::Return}) {
+ int pos = GetRand(rng, prevPos, plan.size());
+ plan.emplace(plan.begin() + pos, std::make_pair(action, i));
+ prevPos = pos + 1;
+ }
+ }
+ return plan;
+}
+
+
+Y_UNIT_TEST_SUITE(YdbSdkSessionsPool) {
+ Y_UNIT_TEST(Get1Session) {
+ TDefaultTestSetup setup(1);
+ auto& client = setup.GetClient();
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionsLimit(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 0);
+
+ {
+ auto session = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 0);
+ }
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 1);
+ }
+
+ void TestWaitQueue(NYdb::NTable::TTableClient& client, ui32 activeSessionsLimit) {
+ std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
+ std::vector<NYdb::NTable::TCreateSessionResult> sessions;
+
+ // exhaust the pool
+ for (ui32 i = 0; i < activeSessionsLimit; ++i) {
+ sessions.emplace_back(client.GetSession().ExtractValueSync());
+ }
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit);
+
+ // next should be in the wait queue
+ for (ui32 i = 0; i < activeSessionsLimit * 10; ++i) {
+ sessionFutures.emplace_back(client.GetSession());
+ }
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit);
+
+ // next should be a fake session
+ {
+ auto brokenSession = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT(!brokenSession.IsSuccess());
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ for (auto& sessionFuture: sessionFutures) {
+ UNIT_ASSERT(!sessionFuture.HasValue());
+ }
+
+ for (auto& sessionFuture: sessionFutures) {
+ sessions.erase(sessions.begin());
+ sessions.emplace_back(sessionFuture.ExtractValueSync());
+ }
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit);
+ }
+
+ Y_UNIT_TEST(WaitQueue1) {
+ ui32 activeSessionsLimit = 1;
+
+ TDefaultTestSetup setup(activeSessionsLimit);
+ auto& client = setup.GetClient();
+
+ TestWaitQueue(client, activeSessionsLimit);
+ }
+
+ Y_UNIT_TEST(WaitQueue10) {
+ ui32 activeSessionsLimit = 10;
+
+ TDefaultTestSetup setup(activeSessionsLimit);
+ auto& client = setup.GetClient();
+
+ TestWaitQueue(client, activeSessionsLimit);
+ }
+
+ Y_UNIT_TEST(RunSmallPlan) {
+ TDefaultTestSetup setup(1);
+ auto& client = setup.GetClient();
+
+ TPlan plan{
+ {EAction::CreateFuture, 1},
+ {EAction::ExtractValue, 1},
+ {EAction::CreateFuture, 2},
+ {EAction::Return, 1},
+ {EAction::ExtractValue, 2},
+ {EAction::Return, 2}
+ };
+ CheckPlan(plan);
+ RunPlan(plan, client);
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 1);
+ }
+
+ Y_UNIT_TEST(CustomPlan) {
+ TDefaultTestSetup setup(1);
+ auto& client = setup.GetClient();
+
+ TPlan plan{
+ {EAction::CreateFuture, 1}
+ };
+ CheckPlan(plan);
+ RunPlan(plan, client);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ }
+
+ ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
+ std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
+ std::vector<NYdb::NTable::TCreateSessionResult> sessions;
+ std::mt19937 rng(0);
+ ui32 successCount = 0;
+
+ for (ui32 i = 0; i < activeSessionsLimit * 12; ++i) {
+ sessionFutures.emplace_back(client.GetSession());
+ }
+
+ for (ui32 i = 0; i < n; ++i) {
+ switch (static_cast<EAction>(GetRand(rng, 0, 2))) {
+ case EAction::CreateFuture: {
+ sessionFutures.emplace_back(client.GetSession());
+ break;
+ }
+ case EAction::ExtractValue: {
+ if (sessionFutures.empty()) {
+ break;
+ }
+ auto ind = GetRand(rng, 0, sessionFutures.size() - 1);
+ auto sessionFuture = sessionFutures[ind];
+ if (sessionFuture.HasValue()) {
+ auto session = sessionFuture.ExtractValueSync();
+ if (session.IsSuccess()) {
+ ++successCount;
+ }
+ sessions.emplace_back(std::move(session));
+ sessionFutures.erase(sessionFutures.begin() + ind);
+ break;
+ }
+ break;
+ }
+ case EAction::Return: {
+ if (sessions.empty()) {
+ break;
+ }
+ auto ind = GetRand(rng, 0, sessions.size() - 1);
+ sessions.erase(sessions.begin() + ind);
+ break;
+ }
+ }
+ }
+ return successCount;
+ }
+
+ Y_UNIT_TEST(StressTestSync1) {
+ ui32 activeSessionsLimit = 1;
+
+ TDefaultTestSetup setup(activeSessionsLimit);
+ auto& client = setup.GetClient();
+
+ RunStressTestSync(1000, activeSessionsLimit, client);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
+ }
+
+ Y_UNIT_TEST(StressTestSync10) {
+ ui32 activeSessionsLimit = 10;
+
+ TDefaultTestSetup setup(activeSessionsLimit);
+ auto& client = setup.GetClient();
+
+ RunStressTestSync(1000, activeSessionsLimit, client);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
+ }
+
+ ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& client) {
+ std::atomic<ui32> successCount(0);
+ std::atomic<ui32> jobIndex(0);
+
+ auto job = [&client, &successCount, &jobIndex, n]() mutable {
+ std::mt19937 rng(++jobIndex);
+ for (ui32 i = 0; i < n; ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
+ auto sessionFuture = client.GetSession();
+ std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
+ auto session = sessionFuture.ExtractValueSync();
+ std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
+ successCount += session.IsSuccess();
+ }
+ };
+
+ IThreadFactory* pool = SystemThreadFactory();
+ TVector<TAutoPtr<IThreadFactory::IThread>> threads;
+ threads.resize(nThreads);
+ for (ui32 i = 0; i < nThreads; i++) {
+ threads[i] = pool->Run(job);
+ }
+ for (ui32 i = 0; i < nThreads; i++) {
+ threads[i]->Join();
+ }
+
+ return successCount;
+ }
+
+ Y_UNIT_TEST(StressTestAsync1) {
+ ui32 activeSessionsLimit = 1;
+
+ TDefaultTestSetup setup(activeSessionsLimit);
+ auto& client = setup.GetClient();
+
+ RunStressTestAsync(100, 10, client);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
+ }
+
+ Y_UNIT_TEST(StressTestAsync10) {
+ ui32 activeSessionsLimit = 10;
+
+ TDefaultTestSetup setup(activeSessionsLimit);
+ auto& client = setup.GetClient();
+
+ RunStressTestAsync(1000, 10, client);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
+ }
+
+ void TestPeriodicTask(ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
+ std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
+ std::vector<NYdb::NTable::TCreateSessionResult> sessions;
+
+ for (ui32 i = 0; i < activeSessionsLimit; ++i) {
+ sessions.emplace_back(client.GetSession().ExtractValueSync());
+ UNIT_ASSERT_VALUES_EQUAL(sessions.back().IsSuccess(), true);
+ }
+
+ for (ui32 i = 0; i < activeSessionsLimit; ++i) {
+ sessionFutures.emplace_back(client.GetSession());
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ for (auto& sessionFuture : sessionFutures) {
+ UNIT_ASSERT(!sessionFuture.HasValue());
+ }
+
+ // Wait for wait session timeout
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ for (auto& sessionFuture : sessionFutures) {
+ UNIT_ASSERT(sessionFuture.HasValue());
+ UNIT_ASSERT(!sessionFuture.ExtractValueSync().IsSuccess());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit);
+
+ sessionFutures.clear();
+ sessions.clear();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
+ }
+
+ Y_UNIT_TEST(PeriodicTask1) {
+ ui32 activeSessionsLimit = 1;
+
+ TDefaultTestSetup setup(activeSessionsLimit);
+ auto& client = setup.GetClient();
+
+ TestPeriodicTask(activeSessionsLimit, client);
+ }
+
+ Y_UNIT_TEST(PeriodicTask10) {
+ ui32 activeSessionsLimit = 10;
+
+ TDefaultTestSetup setup(activeSessionsLimit);
+ auto& client = setup.GetClient();
+
+ TestPeriodicTask(activeSessionsLimit, client);
+ }
+
+ Y_UNIT_TEST(FailTest) {
+ // This test reproduces bug from KIKIMR-18063
+ TDefaultTestSetup setup(1);
+ auto& client = setup.GetClient();
+
+ auto sessionFromPool = client.GetSession().ExtractValueSync();
+ auto futureInWaitPool = client.GetSession();
+
+ {
+ auto standaloneSessionThatWillBeBroken = client.CreateSession().ExtractValueSync();
+ auto res = standaloneSessionThatWillBeBroken.GetSession().ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync();
+ }
+ }
+}