aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBulat <bulat@ydb.tech>2025-04-22 12:47:19 +0300
committerGitHub <noreply@github.com>2025-04-22 12:47:19 +0300
commit24a3abb1a338a5293e7b45a411f9f17e48c144d0 (patch)
tree9ee31d442206ee13c747148a7947ec7485ea080e
parente040fdf1cb9d749133e6ecfb3d61cc1ace3e0f40 (diff)
downloadydb-24a3abb1a338a5293e7b45a411f9f17e48c144d0.tar.gz
Moved sessions tests to C++ SDK repo (#17512)
-rw-r--r--ydb/public/sdk/cpp/tests/integration/sessions/main.cpp802
-rw-r--r--ydb/public/sdk/cpp/tests/integration/sessions/ya.make (renamed from ydb/services/ydb/sdk_sessions_ut/ya.make)9
-rw-r--r--ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp367
-rw-r--r--ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make (renamed from ydb/services/ydb/sdk_sessions_pool_ut/ya.make)7
-rw-r--r--ydb/public/sdk/cpp/tests/integration/ya.make2
-rw-r--r--ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp448
-rw-r--r--ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp831
-rw-r--r--ydb/services/ydb/ya.make2
8 files changed, 1178 insertions, 1290 deletions
diff --git a/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp b/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp
new file mode 100644
index 00000000000..6a1af34fb27
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp
@@ -0,0 +1,802 @@
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h>
+
+#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
+#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h>
+
+#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <grpcpp/security/credentials.h>
+#include <grpcpp/create_channel.h>
+
+#include <random>
+#include <thread>
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+namespace {
+
+void CreateTestTable(NYdb::TDriver& driver) {
+ NYdb::NTable::TTableClient client(driver);
+ auto sessionResult = client.GetSession().ExtractValueSync();
+ ASSERT_TRUE(sessionResult.IsSuccess());
+ auto session = sessionResult.GetSession();
+ auto result = session.ExecuteSchemeQuery(R"___(
+ CREATE TABLE `/local/t` (
+ Key Uint32,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )___").ExtractValueSync();
+ ASSERT_TRUE(result.IsSuccess());
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+}
+
+void WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client, std::string& sessionId) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_TRUE(sessionResponse.IsSuccess());
+ auto session = sessionResponse.GetSession();
+ sessionId = session.GetId();
+ auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+
+ ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString();
+
+ TResultSetParser resultSet(res.GetResultSetParser(0));
+ ASSERT_EQ(resultSet.ColumnsCount(), 2u);
+}
+
+void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, std::int64_t expected) {
+ int attempt = 10;
+ while (attempt--) {
+ if (client.GetCurrentPoolSize() == expected)
+ break;
+ Sleep(TDuration::MilliSeconds(100));
+ }
+ ASSERT_EQ(client.GetCurrentPoolSize(), expected);
+}
+
+}
+
+void CheckDelete(const NYdbGrpc::TGRpcClientConfig& clientConfig, const std::string& id, int expected, bool& allDoneOk) {
+ NYdbGrpc::TGRpcClientLow clientLow;
+ auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig);
+
+ Ydb::Query::DeleteSessionRequest request;
+ request.set_session_id(id);
+
+ NYdbGrpc::TResponseCallback<Ydb::Query::DeleteSessionResponse> responseCb =
+ [&allDoneOk, expected](NYdbGrpc::TGrpcStatus&& grpcStatus, Ydb::Query::DeleteSessionResponse&& response) -> void {
+ ASSERT_FALSE(grpcStatus.InternalError);
+ ASSERT_EQ(grpcStatus.GRpcStatusCode, 0) << grpcStatus.Msg + " " + grpcStatus.Details;
+ allDoneOk &= (response.status() == expected);
+ if (!allDoneOk) {
+ std::cerr << "Expected status: " << expected << ", got response: " << response.DebugString() << std::endl;
+ }
+ };
+
+ connection->DoRequest(request, std::move(responseCb), &Ydb::Query::V1::QueryService::Stub::AsyncDeleteSession);
+}
+
+TEST(YdbSdkSessions, TestSessionPool) {
+ const std::string location = std::getenv("YDB_ENDPOINT");
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ int count = 10;
+
+ std::unordered_set<std::string> sids;
+ while (count--) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_FALSE(sessionResponse.IsTransportError());
+ auto session = sessionResponse.GetSession();
+ sids.insert(session.GetId());
+ auto result = session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
+
+ ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS);
+ ASSERT_EQ(result.GetEndpoint(), location);
+ }
+ // All requests used one session
+ ASSERT_EQ(sids.size(), 1u);
+ // No more session captured by client
+ ASSERT_EQ(client.GetActiveSessionCount(), 0u);
+
+ driver.Stop(true);
+}
+
+TEST(YdbSdkSessions, TestMultipleSessions) {
+ std::string location = std::getenv("YDB_ENDPOINT");
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ int count = 10;
+
+ std::vector<TSession> sids;
+ std::vector<TAsyncDataQueryResult> results;
+ while (count--) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_FALSE(sessionResponse.IsTransportError());
+ auto session = sessionResponse.GetSession();
+ sids.push_back(session);
+ results.push_back(session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
+ }
+
+ NThreading::WaitExceptionOrAll(results).Wait();
+ ASSERT_EQ(client.GetActiveSessionCount(), 10);
+
+ for (auto& result : results) {
+ ASSERT_EQ(result.GetValue().GetStatus(), EStatus::SUCCESS);
+ }
+ sids.clear();
+ results.clear();
+
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+
+ driver.Stop(true);
+}
+
+TEST(YdbSdkSessions, TestActiveSessionCountAfterBadSession) {
+ std::string location = std::getenv("YDB_ENDPOINT");
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ int count = 10;
+
+ std::vector<TSession> sids;
+ std::vector<TAsyncDataQueryResult> results;
+ while (count--) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_FALSE(sessionResponse.IsTransportError());
+ auto session = sessionResponse.GetSession();
+ sids.push_back(session);
+ if (count == 0) {
+ // Force BAD session server response for ExecuteDataQuery
+ ASSERT_EQ(session.Close().GetValueSync().GetStatus(), EStatus::SUCCESS);
+ results.push_back(session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
+ } else {
+ results.push_back(session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
+ }
+ }
+
+ NThreading::WaitExceptionOrAll(results).Wait();
+ ASSERT_EQ(client.GetActiveSessionCount(), 10);
+
+ for (size_t i = 0; i < results.size(); i++) {
+ if (i == 9) {
+ ASSERT_EQ(results[i].GetValue().GetStatus(), EStatus::BAD_SESSION);
+ } else {
+ ASSERT_EQ(results[i].GetValue().GetStatus(), EStatus::SUCCESS);
+ }
+ }
+ sids.clear();
+ results.clear();
+
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+
+ driver.Stop(true);
+}
+
+TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryService) {
+ GTEST_SKIP() << "Test is failing right now";
+ std::string location = std::getenv("YDB_ENDPOINT");
+ auto clientConfig = NYdbGrpc::TGRpcClientConfig(location);
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ CreateTestTable(driver);
+
+ NYdb::NQuery::TQueryClient client(driver);
+ std::string sessionId;
+ WarmPoolCreateSession(client, sessionId);
+ WaitForSessionsInPool(client, 1);
+
+ bool allDoneOk = true;
+ CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
+ ASSERT_TRUE(allDoneOk);
+
+ {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_TRUE(sessionResponse.IsSuccess());
+ auto session = sessionResponse.GetSession();
+ ASSERT_EQ(session.GetId(), sessionId);
+
+ auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+
+ ASSERT_EQ(res.GetStatus(), EStatus::BAD_SESSION) << res.GetIssues().ToString();
+ }
+
+ WaitForSessionsInPool(client, 0);
+
+ {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_TRUE(sessionResponse.IsSuccess());
+ auto session = sessionResponse.GetSession();
+ ASSERT_NE(session.GetId(), sessionId);
+ auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+
+ ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString();
+ }
+
+ WaitForSessionsInPool(client, 1);
+
+ driver.Stop(true);
+}
+
+TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) {
+ GTEST_SKIP() << "Test is failing right now";
+ std::string location = std::getenv("YDB_ENDPOINT");
+ auto clientConfig = NYdbGrpc::TGRpcClientConfig(location);
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ CreateTestTable(driver);
+
+ NYdb::NQuery::TQueryClient client(driver);
+ std::string sessionId;
+ WarmPoolCreateSession(client, sessionId);
+ WaitForSessionsInPool(client, 1);
+
+ bool allDoneOk = true;
+ CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
+ ASSERT_TRUE(allDoneOk);
+
+ {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_TRUE(sessionResponse.IsSuccess());
+ auto session = sessionResponse.GetSession();
+ ASSERT_EQ(session.GetId(), sessionId);
+
+ auto it = session.StreamExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+
+ ASSERT_EQ(it.GetStatus(), EStatus::SUCCESS) << it.GetIssues().ToString();
+
+ auto res = it.ReadNext().GetValueSync();
+ ASSERT_EQ(res.GetStatus(), EStatus::BAD_SESSION) << res.GetIssues().ToString();
+ }
+
+ WaitForSessionsInPool(client, 0);
+
+ {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_TRUE(sessionResponse.IsSuccess());
+ auto session = sessionResponse.GetSession();
+ ASSERT_NE(session.GetId(), sessionId);
+
+ auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+
+ ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString();
+ }
+
+ WaitForSessionsInPool(client, 1);
+
+ driver.Stop(true);
+}
+
+TEST(YdbSdkSessions, TestActiveSessionCountAfterTransportError) {
+ std::string location = std::getenv("YDB_ENDPOINT");
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ int count = 100;
+
+ {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_TRUE(sessionResponse.IsSuccess());
+ auto session = sessionResponse.GetSession();
+ auto result = session.ExecuteSchemeQuery(R"___(
+ CREATE TABLE `/local/t` (
+ Key Uint32,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )___").ExtractValueSync();
+ ASSERT_TRUE(result.IsSuccess());
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+ }
+
+ while (count--) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_FALSE(sessionResponse.IsTransportError());
+ auto session = sessionResponse.GetSession();
+
+ // Assume 10us is too small to execute query and get response
+ auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `/local/t`;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+ }
+
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ driver.Stop(true);
+}
+
+TEST(YdbSdkSessions, MultiThreadSync) {
+ std::string location = std::getenv("YDB_ENDPOINT");
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ const int nThreads = 10;
+ const int nRequests = 1000;
+ auto job = [client]() mutable {
+ for (int i = 0; i < nRequests; i++) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(sessionResponse.GetStatus(), EStatus::SUCCESS);
+ }
+ };
+ std::vector<std::thread> threads;
+ for (int i = 0; i < nThreads; i++) {
+ threads.emplace_back(job);
+ }
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ driver.Stop(true);
+}
+
+void EnsureCanExecQuery(NYdb::NTable::TSession session) {
+ auto execStatus = session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus();
+ ASSERT_EQ(execStatus, EStatus::SUCCESS);
+}
+
+void EnsureCanExecQuery(NYdb::NQuery::TSession session) {
+ auto execStatus = session.ExecuteQuery("SELECT 42;",
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync().GetStatus();
+ ASSERT_EQ(execStatus, EStatus::SUCCESS);
+}
+
+template<typename TClient>
+void DoMultiThreadSessionPoolLimitSync() {
+ std::string location = std::getenv("YDB_ENDPOINT");
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ const int maxActiveSessions = 45;
+ TClient client(driver,
+ typename TClient::TSettings()
+ .SessionPoolSettings(
+ typename TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
+
+ constexpr int nThreads = 100;
+ NYdb::EStatus statuses[nThreads];
+ std::vector<std::optional<typename TClient::TSession>> sessions;
+ sessions.resize(nThreads);
+ std::atomic<int> t = 0;
+ auto job = [client, &t, &statuses, &sessions]() mutable {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ int i = ++t;
+ statuses[--i] = sessionResponse.GetStatus();
+ if (statuses[i] == EStatus::SUCCESS) {
+ EnsureCanExecQuery(sessionResponse.GetSession());
+ sessions[i] = sessionResponse.GetSession();
+ }
+ };
+
+ std::vector<std::thread> threads;
+ threads.resize(nThreads);
+ for (int i = 0; i < nThreads; i++) {
+ threads[i] = std::thread(job);
+ }
+ for (int i = 0; i < nThreads; i++) {
+ threads[i].join();
+ }
+
+ sessions.clear();
+
+ int successCount = 0;
+ int exhaustedCount = 0;
+ for (int i = 0; i < nThreads; i++) {
+ switch (statuses[i]) {
+ case EStatus::SUCCESS:
+ successCount++;
+ break;
+ case EStatus::CLIENT_RESOURCE_EXHAUSTED:
+ exhaustedCount++;
+ break;
+ default:
+ FAIL() << "Unexpected status code: " << static_cast<size_t>(statuses[i]);
+ }
+ }
+
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ ASSERT_EQ(successCount, maxActiveSessions);
+ ASSERT_EQ(exhaustedCount, nThreads - maxActiveSessions);
+ driver.Stop(true);
+}
+
+TEST(YdbSdkSessions, MultiThreadSessionPoolLimitSyncTableClient) {
+ DoMultiThreadSessionPoolLimitSync<NYdb::NTable::TTableClient>();
+}
+
+TEST(YdbSdkSessions, MultiThreadSessionPoolLimitSyncQueryClient) {
+ DoMultiThreadSessionPoolLimitSync<NYdb::NQuery::TQueryClient>();
+}
+
+NYdb::NTable::TAsyncDataQueryResult ExecQueryAsync(NYdb::NTable::TSession session, const std::string q) {
+ return session.ExecuteDataQuery(q,
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx());
+}
+
+NYdb::NQuery::TAsyncExecuteQueryResult ExecQueryAsync(NYdb::NQuery::TSession session, const std::string q) {
+ return session.ExecuteQuery(q,
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx());
+}
+
+template<typename T>
+void DoMultiThreadMultipleRequestsOnSharedSessions() {
+ std::string location = std::getenv("YDB_ENDPOINT");
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ const int maxActiveSessions = 10;
+ typename T::TClient client(driver,
+ typename T::TClient::TSettings()
+ .SessionPoolSettings(
+ typename T::TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
+
+ constexpr int nThreads = 20;
+ constexpr int nRequests = 50;
+ std::array<std::vector<typename T::TResult>, nThreads> results;
+ std::atomic<int> t = 0;
+ std::atomic<int> validSessions = 0;
+ auto job = [client, &t, &results, &validSessions]() mutable {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+
+ int i = ++t;
+ std::vector<typename T::TResult>& r = results[--i];
+
+ if (sessionResponse.GetStatus() != EStatus::SUCCESS) {
+ return;
+ }
+ validSessions.fetch_add(1);
+
+ for (int i = 0; i < nRequests; i++) {
+ r.push_back(ExecQueryAsync(sessionResponse.GetSession(), "SELECT 42;"));
+ }
+ };
+
+ std::vector<std::thread> threads;
+ for (int i = 0; i < nThreads; i++) {
+ threads.emplace_back(job);
+ }
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ for (auto& r : results) {
+ NThreading::WaitExceptionOrAll(r).Wait();
+ }
+ for (auto& r : results) {
+ if (!r.empty()) {
+ for (auto& asyncStatus : r) {
+ auto res = asyncStatus.GetValue();
+ if (!res.IsSuccess()) {
+ ASSERT_EQ(res.GetStatus(), EStatus::SESSION_BUSY);
+ }
+ }
+ }
+ }
+ ASSERT_EQ(client.GetActiveSessionCount(), maxActiveSessions);
+ auto curExpectedActive = maxActiveSessions;
+ auto empty = 0;
+ for (auto& r : results) {
+ if (!r.empty()) {
+ r.clear();
+ ASSERT_EQ(client.GetActiveSessionCount(), --curExpectedActive);
+ } else {
+ empty++;
+ }
+ }
+ ASSERT_EQ(empty, nThreads - maxActiveSessions);
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ driver.Stop(true);
+}
+
+TEST(YdbSdkSessions, MultiThreadMultipleRequestsOnSharedSessionsTableClient) {
+ struct TTypeHelper {
+ using TClient = NYdb::NTable::TTableClient;
+ using TResult = NYdb::NTable::TAsyncDataQueryResult;
+ };
+ DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>();
+}
+
+TEST(YdbSdkSessions, MultiThreadMultipleRequestsOnSharedSessionsQueryClient) {
+ GTEST_SKIP() << "Enable after interactive tx support";
+ struct TTypeHelper {
+ using TClient = NYdb::NQuery::TQueryClient;
+ using TResult = NYdb::NQuery::TAsyncExecuteQueryResult;
+ };
+ DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>();
+}
+
+TEST(YdbSdkSessions, SessionsServerLimit) {
+ GTEST_SKIP() << "Enable after accepting a pull request with merging configs";
+
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT")));
+ NYdb::NTable::TTableClient client(driver);
+ auto sessionResult = client.CreateSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+
+ sessionResult = client.CreateSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session2 = sessionResult.GetSession();
+
+ sessionResult = client.CreateSession().ExtractValueSync();
+ sessionResult.GetIssues().PrintTo(Cerr);
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::OVERLOADED);
+
+ auto status = session1.Close().ExtractValueSync();
+ ASSERT_EQ(status.IsTransportError(), false);
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ ASSERT_EQ(status.GetStatus(), EStatus::SUCCESS);
+
+ auto result = session2.ExecuteDataQuery(R"___(
+ SELECT 1;
+ )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS);
+
+ sessionResult = client.CreateSession().ExtractValueSync();
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+
+ sessionResult = client.CreateSession().ExtractValueSync();
+ sessionResult.GetIssues().PrintTo(Cerr);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::OVERLOADED);
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+}
+
+TEST(YdbSdkSessions, SessionsServerLimitWithSessionPool) {
+ GTEST_SKIP() << "Enable after accepting a pull request with merging configs";
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT")));
+ NYdb::NTable::TTableClient client(driver);
+ auto sessionResult1 = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(sessionResult1.GetStatus(), EStatus::SUCCESS);
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+ auto session1 = sessionResult1.GetSession();
+
+ auto sessionResult2 = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(sessionResult2.GetStatus(), EStatus::SUCCESS);
+ ASSERT_EQ(client.GetActiveSessionCount(), 2);
+ auto session2 = sessionResult2.GetSession();
+
+ {
+ auto sessionResult3 = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(sessionResult3.GetStatus(), EStatus::OVERLOADED);
+ ASSERT_EQ(client.GetActiveSessionCount(), 3);
+ }
+ ASSERT_EQ(client.GetActiveSessionCount(), 2);
+
+ auto status = session1.Close().ExtractValueSync();
+ ASSERT_EQ(status.IsTransportError(), false);
+ ASSERT_EQ(status.GetStatus(), EStatus::SUCCESS);
+
+ // Close doesnt free session from user perspective,
+ // the value of ActiveSessionsCounter will be same after Close() call.
+ // Probably we want to chenge this contract
+ ASSERT_EQ(client.GetActiveSessionCount(), 2);
+
+ auto result = session2.ExecuteDataQuery(R"___(
+ SELECT 1;
+ )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS);
+
+ sessionResult1 = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(sessionResult1.GetStatus(), EStatus::SUCCESS);
+ ASSERT_EQ(sessionResult1.GetSession().GetId().empty(), false);
+ ASSERT_EQ(client.GetActiveSessionCount(), 3);
+
+ auto sessionResult3 = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(sessionResult3.GetStatus(), EStatus::OVERLOADED);
+ ASSERT_EQ(client.GetActiveSessionCount(), 4);
+
+ auto tmp = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 5);
+ sessionResult1 = tmp; // here we reset previous created session object,
+ // so perform close rpc call implicitly and delete it
+ ASSERT_EQ(sessionResult1.GetStatus(), EStatus::OVERLOADED);
+ ASSERT_EQ(client.GetActiveSessionCount(), 4);
+}
+
+TEST(YdbSdkSessions, CloseSessionAfterDriverDtorWithoutSessionPool) {
+ std::vector<std::string> sessionIds;
+ int iterations = 50;
+
+ while (iterations--) {
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT")));
+ NYdb::NTable::TTableClient client(driver);
+ auto sessionResult = client.CreateSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+ sessionIds.push_back(session1.GetId());
+ }
+
+ std::shared_ptr<grpc::Channel> channel;
+ channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials());
+ auto stub = Ydb::Table::V1::TableService::NewStub(channel);
+ for (const auto& sessionId : sessionIds) {
+ grpc::ClientContext context;
+ Ydb::Table::KeepAliveRequest request;
+ request.set_session_id(sessionId);
+ Ydb::Table::KeepAliveResponse response;
+ auto status = stub->KeepAlive(&context, request, &response);
+ ASSERT_TRUE(status.ok());
+ auto deferred = response.operation();
+ ASSERT_TRUE(deferred.ready() == true);
+ ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION);
+ }
+}
+
+TEST(YdbSdkSessions, CloseSessionWithSessionPoolExplicit) {
+ std::vector<std::string> sessionIds;
+ int iterations = 100;
+
+ while (iterations--) {
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT")));
+ NYdb::NTable::TTableClient client(driver);
+ //TODO: remove this scope after session tracker implementation
+ {
+ auto sessionResult = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+ sessionIds.push_back(session1.GetId());
+
+ sessionResult = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 2);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+ // Here previous created session will be returnet to session pool
+ session1 = sessionResult.GetSession();
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+ sessionIds.push_back(session1.GetId());
+ }
+
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<> dis(0, 9);
+ if (dis(gen) == 5) {
+ client.Stop().Apply([client](NThreading::TFuture<void> future) {
+ EXPECT_EQ(client.GetActiveSessionCount(), 0);
+ return future;
+ }).Wait();
+ } else {
+ client.Stop().Wait();
+ }
+
+ if (iterations & 4) {
+ driver.Stop(true);
+ }
+ }
+
+ std::shared_ptr<grpc::Channel> channel;
+ channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials());
+ auto stub = Ydb::Table::V1::TableService::NewStub(channel);
+ for (const auto& sessionId : sessionIds) {
+ grpc::ClientContext context;
+ Ydb::Table::KeepAliveRequest request;
+ request.set_session_id(sessionId);
+ Ydb::Table::KeepAliveResponse response;
+ auto status = stub->KeepAlive(&context, request, &response);
+ ASSERT_TRUE(status.ok());
+ auto deferred = response.operation();
+ ASSERT_TRUE(deferred.ready() == true);
+ ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION);
+ }
+}
+
+TEST(YdbSdkSessions, CloseSessionWithSessionPoolExplicitDriverStopOnly) {
+ std::vector<std::string> sessionIds;
+ int iterations = 100;
+
+ while (iterations--) {
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT")));
+ NYdb::NTable::TTableClient client(driver);
+ //TODO: remove this scope after session tracker implementation
+ {
+ auto sessionResult = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+ sessionIds.push_back(session1.GetId());
+
+ sessionResult = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 2);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+ // Here previous created session will be returnet to session pool
+ session1 = sessionResult.GetSession();
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+ sessionIds.push_back(session1.GetId());
+ }
+ driver.Stop(true);
+ }
+
+ std::shared_ptr<grpc::ChannelInterface> channel;
+ channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials());
+ auto stub = Ydb::Table::V1::TableService::NewStub(channel);
+ for (const auto& sessionId : sessionIds) {
+ grpc::ClientContext context;
+ Ydb::Table::KeepAliveRequest request;
+ request.set_session_id(sessionId);
+ Ydb::Table::KeepAliveResponse response;
+ auto status = stub->KeepAlive(&context, request, &response);
+ ASSERT_TRUE(status.ok());
+ auto deferred = response.operation();
+ ASSERT_TRUE(deferred.ready() == true);
+ ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION);
+ }
+}
+
+TEST(YdbSdkSessions, CloseSessionWithSessionPoolFromDtors) {
+ std::vector<std::string> sessionIds;
+ int iterations = 100;
+
+ while (iterations--) {
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT")));
+ NYdb::NTable::TTableClient client(driver);
+ //TODO: remove this scope after session tracker implementation
+ {
+ auto sessionResult = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+ sessionIds.push_back(session1.GetId());
+
+ sessionResult = client.GetSession().ExtractValueSync();
+ ASSERT_EQ(client.GetActiveSessionCount(), 2);
+ ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS);
+ // Here previous created session will be returnet to session pool
+ session1 = sessionResult.GetSession();
+ ASSERT_EQ(client.GetActiveSessionCount(), 1);
+ sessionIds.push_back(session1.GetId());
+ }
+ }
+
+ std::shared_ptr<grpc::Channel> channel;
+ channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials());
+ auto stub = Ydb::Table::V1::TableService::NewStub(channel);
+ for (const auto& sessionId : sessionIds) {
+ grpc::ClientContext context;
+ Ydb::Table::KeepAliveRequest request;
+ request.set_session_id(sessionId);
+ Ydb::Table::KeepAliveResponse response;
+ auto status = stub->KeepAlive(&context, request, &response);
+ ASSERT_TRUE(status.ok());
+ auto deferred = response.operation();
+ ASSERT_TRUE(deferred.ready() == true);
+ ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION);
+ }
+}
diff --git a/ydb/services/ydb/sdk_sessions_ut/ya.make b/ydb/public/sdk/cpp/tests/integration/sessions/ya.make
index c62a0941b93..f263b4eb2b8 100644
--- a/ydb/services/ydb/sdk_sessions_ut/ya.make
+++ b/ydb/public/sdk/cpp/tests/integration/sessions/ya.make
@@ -1,4 +1,5 @@
-UNITTEST_FOR(ydb/services/ydb)
+GTEST()
+INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
FORK_SUBTESTS()
@@ -11,15 +12,13 @@ ELSE()
ENDIF()
SRCS(
- sdk_sessions_ut.cpp
+ main.cpp
)
PEERDIR(
ydb/public/sdk/cpp/src/library/grpc/client
- ydb/core/testlib/default
- ydb/core/testlib
ydb/public/sdk/cpp/src/client/table
- ydb/public/lib/ut_helpers
+ ydb/public/sdk/cpp/src/client/query
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp b/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp
new file mode 100644
index 00000000000..c12937a25ad
--- /dev/null
+++ b/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp
@@ -0,0 +1,367 @@
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
+
+#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <random>
+#include <thread>
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+class YdbSdkSessionsPool : public ::testing::TestWithParam<ui32> {
+protected:
+ void SetUp() override {
+ ui32 maxActiveSessions = GetParam();
+ Driver = std::make_unique<NYdb::TDriver>(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT")));
+
+ auto clientSettings = TClientSettings().SessionPoolSettings(
+ TSessionPoolSettings()
+ .MaxActiveSessions(maxActiveSessions)
+ .KeepAliveIdleThreshold(TDuration::MilliSeconds(10))
+ .CloseIdleThreshold(TDuration::MilliSeconds(10)));
+ Client = std::make_unique<NYdb::NTable::TTableClient>(*Driver, clientSettings);
+ }
+
+ void TearDown() override {
+ Driver->Stop(true);
+ }
+
+protected:
+ std::unique_ptr<NYdb::TDriver> Driver;
+ std::unique_ptr<NYdb::NTable::TTableClient> Client;
+};
+
+class YdbSdkSessionsPool1Session : public YdbSdkSessionsPool {};
+
+enum class EAction: ui8 {
+ CreateFuture,
+ ExtractValue,
+ Return
+};
+using TPlan = std::vector<std::pair<EAction, ui32>>;
+
+
+void CheckPlan(TPlan plan) {
+ std::unordered_map<ui32, EAction> sessions;
+ for (const auto& [action, sessionId]: plan) {
+ if (action == EAction::CreateFuture) {
+ ASSERT_FALSE(sessions.contains(sessionId));
+ } else {
+ ASSERT_TRUE(sessions.contains(sessionId));
+ switch (sessions.at(sessionId)) {
+ case EAction::CreateFuture: {
+ ASSERT_EQ(action, EAction::ExtractValue);
+ break;
+ }
+ case EAction::ExtractValue: {
+ ASSERT_EQ(action, EAction::Return);
+ break;
+ }
+ default: {
+ ASSERT_TRUE(false);
+ }
+ }
+ }
+ sessions[sessionId] = action;
+ }
+}
+
+void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) {
+ std::unordered_map<ui32, NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
+ std::unordered_map<ui32, NYdb::NTable::TCreateSessionResult> sessions;
+
+ ui32 requestedSessions = 0;
+
+ for (const auto& [action, sessionId]: plan) {
+ switch (action) {
+ case EAction::CreateFuture: {
+ sessionFutures.emplace(sessionId, client.GetSession());
+ ++requestedSessions;
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ if (requestedSessions > client.GetActiveSessionsLimit()) {
+ ASSERT_EQ(client.GetActiveSessionCount(), client.GetActiveSessionsLimit());
+ }
+ ASSERT_FALSE(sessionFutures.at(sessionId).HasValue());
+ break;
+ }
+ case EAction::ExtractValue: {
+ auto it = sessionFutures.find(sessionId);
+ auto session = it->second.ExtractValueSync();
+ sessionFutures.erase(it);
+ sessions.emplace(sessionId, std::move(session));
+ break;
+ }
+ case EAction::Return: {
+ sessions.erase(sessionId);
+ --requestedSessions;
+ break;
+ }
+ }
+ ASSERT_LE(client.GetActiveSessionCount(), client.GetActiveSessionsLimit());
+ ASSERT_GE(client.GetActiveSessionCount(), static_cast<i64>(sessions.size()));
+ ASSERT_LE(client.GetActiveSessionCount(), static_cast<i64>(sessions.size() + sessionFutures.size()));
+ }
+}
+
+int GetRand(std::mt19937& rng, int min, int max) {
+ std::uniform_int_distribution<std::mt19937::result_type> dist(min, max);
+ return dist(rng);
+}
+
+
+TPlan GenerateRandomPlan(ui32 numSessions) {
+ TPlan plan;
+ std::random_device dev;
+ std::mt19937 rng(dev());
+
+ for (ui32 i = 0; i < numSessions; ++i) {
+ std::uniform_int_distribution<std::mt19937::result_type> dist(0, plan.size());
+ ui32 prevPos = 0;
+ for (EAction action: {EAction::CreateFuture, EAction::ExtractValue, EAction::Return}) {
+ int pos = GetRand(rng, prevPos, plan.size());
+ plan.emplace(plan.begin() + pos, std::make_pair(action, i));
+ prevPos = pos + 1;
+ }
+ }
+ return plan;
+}
+
+
+TEST_P(YdbSdkSessionsPool1Session, GetSession) {
+ ASSERT_EQ(Client->GetActiveSessionsLimit(), 1);
+ ASSERT_EQ(Client->GetActiveSessionCount(), 0);
+ ASSERT_EQ(Client->GetCurrentPoolSize(), 0);
+
+ {
+ auto session = Client->GetSession().ExtractValueSync();
+
+ ASSERT_EQ(session.GetStatus(), EStatus::SUCCESS);
+ ASSERT_EQ(Client->GetActiveSessionCount(), 1);
+ ASSERT_EQ(Client->GetCurrentPoolSize(), 0);
+ }
+
+ ASSERT_EQ(Client->GetActiveSessionCount(), 0);
+ ASSERT_EQ(Client->GetCurrentPoolSize(), 1);
+}
+
+void TestWaitQueue(NYdb::NTable::TTableClient& client, ui32 activeSessionsLimit) {
+ std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
+ std::vector<NYdb::NTable::TCreateSessionResult> sessions;
+
+ // exhaust the pool
+ for (ui32 i = 0; i < activeSessionsLimit; ++i) {
+ sessions.emplace_back(client.GetSession().ExtractValueSync());
+ }
+ ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
+
+ // next should be in the wait queue
+ for (ui32 i = 0; i < activeSessionsLimit * 10; ++i) {
+ sessionFutures.emplace_back(client.GetSession());
+ }
+ ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
+
+ // next should be a fake session
+ {
+ auto brokenSession = client.GetSession().ExtractValueSync();
+ ASSERT_FALSE(brokenSession.IsSuccess());
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ for (auto& sessionFuture: sessionFutures) {
+ ASSERT_FALSE(sessionFuture.HasValue());
+ }
+
+ for (auto& sessionFuture: sessionFutures) {
+ sessions.erase(sessions.begin());
+ sessions.emplace_back(sessionFuture.ExtractValueSync());
+ }
+ ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
+}
+
+TEST_P(YdbSdkSessionsPool, WaitQueue) {
+ TestWaitQueue(*Client, GetParam());
+}
+
+TEST_P(YdbSdkSessionsPool1Session, RunSmallPlan) {
+ TPlan plan{
+ {EAction::CreateFuture, 1},
+ {EAction::ExtractValue, 1},
+ {EAction::CreateFuture, 2},
+ {EAction::Return, 1},
+ {EAction::ExtractValue, 2},
+ {EAction::Return, 2}
+ };
+ CheckPlan(plan);
+ RunPlan(plan, *Client);
+
+ ASSERT_EQ(Client->GetActiveSessionCount(), 0);
+ ASSERT_EQ(Client->GetCurrentPoolSize(), 1);
+}
+
+TEST_P(YdbSdkSessionsPool1Session, CustomPlan) {
+ TPlan plan{
+ {EAction::CreateFuture, 1}
+ };
+ CheckPlan(plan);
+ RunPlan(plan, *Client);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ ASSERT_EQ(Client->GetActiveSessionCount(), 0);
+}
+
+ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
+ std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
+ std::vector<NYdb::NTable::TCreateSessionResult> sessions;
+ std::mt19937 rng(0);
+ ui32 successCount = 0;
+
+ for (ui32 i = 0; i < activeSessionsLimit * 12; ++i) {
+ sessionFutures.emplace_back(client.GetSession());
+ }
+
+ for (ui32 i = 0; i < n; ++i) {
+ switch (static_cast<EAction>(GetRand(rng, 0, 2))) {
+ case EAction::CreateFuture: {
+ sessionFutures.emplace_back(client.GetSession());
+ break;
+ }
+ case EAction::ExtractValue: {
+ if (sessionFutures.empty()) {
+ break;
+ }
+ auto ind = GetRand(rng, 0, sessionFutures.size() - 1);
+ auto sessionFuture = sessionFutures[ind];
+ if (sessionFuture.HasValue()) {
+ auto session = sessionFuture.ExtractValueSync();
+ if (session.IsSuccess()) {
+ ++successCount;
+ }
+ sessions.emplace_back(std::move(session));
+ sessionFutures.erase(sessionFutures.begin() + ind);
+ break;
+ }
+ break;
+ }
+ case EAction::Return: {
+ if (sessions.empty()) {
+ break;
+ }
+ auto ind = GetRand(rng, 0, sessions.size() - 1);
+ sessions.erase(sessions.begin() + ind);
+ break;
+ }
+ }
+ }
+ return successCount;
+}
+
+TEST_P(YdbSdkSessionsPool, StressTestSync) {
+ ui32 activeSessionsLimit = GetParam();
+
+ RunStressTestSync(1000, activeSessionsLimit, *Client);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ ASSERT_EQ(Client->GetActiveSessionCount(), 0);
+ ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit);
+}
+
+ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& client) {
+ std::atomic<ui32> successCount(0);
+ std::atomic<ui32> jobIndex(0);
+
+ auto job = [&client, &successCount, &jobIndex, n]() mutable {
+ std::mt19937 rng(++jobIndex);
+ for (ui32 i = 0; i < n; ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
+ auto sessionFuture = client.GetSession();
+ std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
+ auto session = sessionFuture.ExtractValueSync();
+ std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
+ successCount += session.IsSuccess();
+ }
+ };
+
+ std::vector<std::thread> threads;
+ for (ui32 i = 0; i < nThreads; i++) {
+ threads.emplace_back(job);
+ }
+ for (auto& thread: threads) {
+ thread.join();
+ }
+
+ return successCount;
+}
+
+TEST_P(YdbSdkSessionsPool, StressTestAsync) {
+ ui32 activeSessionsLimit = GetParam();
+ ui32 iterations = (activeSessionsLimit == 1) ? 100 : 1000;
+
+ RunStressTestAsync(iterations, 10, *Client);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ ASSERT_EQ(Client->GetActiveSessionCount(), 0);
+ ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit);
+}
+
+void TestPeriodicTask(ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
+ std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
+ std::vector<NYdb::NTable::TCreateSessionResult> sessions;
+
+ for (ui32 i = 0; i < activeSessionsLimit; ++i) {
+ sessions.emplace_back(client.GetSession().ExtractValueSync());
+ ASSERT_TRUE(sessions.back().IsSuccess());
+ }
+
+ for (ui32 i = 0; i < activeSessionsLimit; ++i) {
+ sessionFutures.emplace_back(client.GetSession());
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ for (auto& sessionFuture : sessionFutures) {
+ ASSERT_FALSE(sessionFuture.HasValue());
+ }
+
+ // Wait for wait session timeout
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+
+ for (auto& sessionFuture : sessionFutures) {
+ ASSERT_TRUE(sessionFuture.HasValue());
+ ASSERT_FALSE(sessionFuture.ExtractValueSync().IsSuccess());
+ }
+
+ ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
+
+ sessionFutures.clear();
+ sessions.clear();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10000));
+ ASSERT_EQ(client.GetActiveSessionCount(), 0);
+ ASSERT_EQ(client.GetCurrentPoolSize(), activeSessionsLimit);
+}
+
+TEST_P(YdbSdkSessionsPool, PeriodicTask) {
+ TestPeriodicTask(GetParam(), *Client);
+}
+
+TEST_P(YdbSdkSessionsPool1Session, FailTest) {
+ // This test reproduces bug from KIKIMR-18063
+ auto sessionFromPool = Client->GetSession().ExtractValueSync();
+ auto futureInWaitPool = Client->GetSession();
+
+ {
+ auto standaloneSessionThatWillBeBroken = Client->CreateSession().ExtractValueSync();
+ auto res = standaloneSessionThatWillBeBroken.GetSession().ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync();
+ }
+}
+
+INSTANTIATE_TEST_SUITE_P(, YdbSdkSessionsPool, ::testing::Values(1, 10));
+
+INSTANTIATE_TEST_SUITE_P(, YdbSdkSessionsPool1Session, ::testing::Values(1));
diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/ya.make b/ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make
index ae03a20c02e..998a6caec44 100644
--- a/ydb/services/ydb/sdk_sessions_pool_ut/ya.make
+++ b/ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make
@@ -1,4 +1,5 @@
-UNITTEST_FOR(ydb/services/ydb)
+GTEST()
+INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
FORK_SUBTESTS()
@@ -11,13 +12,11 @@ ELSE()
ENDIF()
SRCS(
- sdk_sessions_pool_ut.cpp
+ main.cpp
)
PEERDIR(
ydb/public/sdk/cpp/src/library/grpc/client
- ydb/core/testlib/default
- ydb/core/testlib
ydb/public/sdk/cpp/src/client/table
)
diff --git a/ydb/public/sdk/cpp/tests/integration/ya.make b/ydb/public/sdk/cpp/tests/integration/ya.make
index e801ac6b63a..d55b85646fe 100644
--- a/ydb/public/sdk/cpp/tests/integration/ya.make
+++ b/ydb/public/sdk/cpp/tests/integration/ya.make
@@ -2,4 +2,6 @@ RECURSE(
basic_example
bulk_upsert
server_restart
+ sessions
+ sessions_pool
)
diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp b/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp
deleted file mode 100644
index 0a60c556b63..00000000000
--- a/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp
+++ /dev/null
@@ -1,448 +0,0 @@
-#include "ydb_common_ut.h"
-
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
-#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
-
-#include <util/thread/pool.h>
-
-#include <random>
-#include <thread>
-
-using namespace NYdb;
-using namespace NYdb::NTable;
-
-class TDefaultTestSetup {
-public:
- TDefaultTestSetup(ui32 maxActiveSessions)
- : Driver_(NYdb::TDriver(
- TDriverConfig().SetEndpoint(
- TStringBuilder() << "localhost:" << Server_.GetPort()
- )
- ))
- , Client_(
- Driver_,
- TClientSettings().SessionPoolSettings(
- TSessionPoolSettings()
- .MaxActiveSessions(maxActiveSessions)
- .KeepAliveIdleThreshold(TDuration::MilliSeconds(10))
- .CloseIdleThreshold(TDuration::MilliSeconds(10))
- )
- )
- {
- }
-
- ~TDefaultTestSetup() {
- Driver_.Stop(true);
- }
-
- NYdb::NTable::TTableClient& GetClient() {
- return Client_;
- }
-
-private:
- TKikimrWithGrpcAndRootSchema Server_;
- NYdb::TDriver Driver_;
- NYdb::NTable::TTableClient Client_;
-};
-
-
-enum class EAction: ui8 {
- CreateFuture,
- ExtractValue,
- Return
-};
-using TPlan = TVector<std::pair<EAction, ui32>>;
-
-
-void CheckPlan(TPlan plan) {
- THashMap<ui32, EAction> sessions;
- for (const auto& [action, sessionId]: plan) {
- if (action == EAction::CreateFuture) {
- UNIT_ASSERT(!sessions.contains(sessionId));
- } else {
- UNIT_ASSERT(sessions.contains(sessionId));
- switch (sessions.at(sessionId)) {
- case EAction::CreateFuture: {
- UNIT_ASSERT(action == EAction::ExtractValue);
- break;
- }
- case EAction::ExtractValue: {
- UNIT_ASSERT(action == EAction::Return);
- break;
- }
- default: {
- UNIT_ASSERT(false);
- }
- }
- }
- sessions[sessionId] = action;
- }
-}
-
-void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) {
- THashMap<ui32, NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
- THashMap<ui32, NYdb::NTable::TCreateSessionResult> sessions;
-
- ui32 requestedSessions = 0;
-
- for (const auto& [action, sessionId]: plan) {
- switch (action) {
- case EAction::CreateFuture: {
- sessionFutures.emplace(sessionId, client.GetSession());
- ++requestedSessions;
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- if (requestedSessions > client.GetActiveSessionsLimit()) {
- UNIT_ASSERT(client.GetActiveSessionCount() == client.GetActiveSessionsLimit());
- }
- UNIT_ASSERT(!sessionFutures.at(sessionId).HasValue());
- break;
- }
- case EAction::ExtractValue: {
- auto it = sessionFutures.find(sessionId);
- auto session = it->second.ExtractValueSync();
- sessionFutures.erase(it);
- sessions.emplace(sessionId, std::move(session));
- break;
- }
- case EAction::Return: {
- sessions.erase(sessionId);
- --requestedSessions;
- break;
- }
- }
- UNIT_ASSERT(client.GetActiveSessionCount() <= client.GetActiveSessionsLimit());
- UNIT_ASSERT(client.GetActiveSessionCount() >= static_cast<i64>(sessions.size()));
- UNIT_ASSERT(client.GetActiveSessionCount() <= static_cast<i64>(sessions.size() + sessionFutures.size()));
- }
-}
-
-int GetRand(std::mt19937& rng, int min, int max) {
- std::uniform_int_distribution<std::mt19937::result_type> dist(min, max);
- return dist(rng);
-}
-
-
-TPlan GenerateRandomPlan(ui32 numSessions) {
- TPlan plan;
- std::random_device dev;
- std::mt19937 rng(dev());
-
- for (ui32 i = 0; i < numSessions; ++i) {
- std::uniform_int_distribution<std::mt19937::result_type> dist(0, plan.size());
- ui32 prevPos = 0;
- for (EAction action: {EAction::CreateFuture, EAction::ExtractValue, EAction::Return}) {
- int pos = GetRand(rng, prevPos, plan.size());
- plan.emplace(plan.begin() + pos, std::make_pair(action, i));
- prevPos = pos + 1;
- }
- }
- return plan;
-}
-
-
-Y_UNIT_TEST_SUITE(YdbSdkSessionsPool) {
- Y_UNIT_TEST(Get1Session) {
- TDefaultTestSetup setup(1);
- auto& client = setup.GetClient();
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionsLimit(), 1);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 0);
-
- {
- auto session = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 0);
- }
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 1);
- }
-
- void TestWaitQueue(NYdb::NTable::TTableClient& client, ui32 activeSessionsLimit) {
- std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
- std::vector<NYdb::NTable::TCreateSessionResult> sessions;
-
- // exhaust the pool
- for (ui32 i = 0; i < activeSessionsLimit; ++i) {
- sessions.emplace_back(client.GetSession().ExtractValueSync());
- }
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit);
-
- // next should be in the wait queue
- for (ui32 i = 0; i < activeSessionsLimit * 10; ++i) {
- sessionFutures.emplace_back(client.GetSession());
- }
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit);
-
- // next should be a fake session
- {
- auto brokenSession = client.GetSession().ExtractValueSync();
- UNIT_ASSERT(!brokenSession.IsSuccess());
- }
-
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- for (auto& sessionFuture: sessionFutures) {
- UNIT_ASSERT(!sessionFuture.HasValue());
- }
-
- for (auto& sessionFuture: sessionFutures) {
- sessions.erase(sessions.begin());
- sessions.emplace_back(sessionFuture.ExtractValueSync());
- }
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit);
- }
-
- Y_UNIT_TEST(WaitQueue1) {
- ui32 activeSessionsLimit = 1;
-
- TDefaultTestSetup setup(activeSessionsLimit);
- auto& client = setup.GetClient();
-
- TestWaitQueue(client, activeSessionsLimit);
- }
-
- Y_UNIT_TEST(WaitQueue10) {
- ui32 activeSessionsLimit = 10;
-
- TDefaultTestSetup setup(activeSessionsLimit);
- auto& client = setup.GetClient();
-
- TestWaitQueue(client, activeSessionsLimit);
- }
-
- Y_UNIT_TEST(RunSmallPlan) {
- TDefaultTestSetup setup(1);
- auto& client = setup.GetClient();
-
- TPlan plan{
- {EAction::CreateFuture, 1},
- {EAction::ExtractValue, 1},
- {EAction::CreateFuture, 2},
- {EAction::Return, 1},
- {EAction::ExtractValue, 2},
- {EAction::Return, 2}
- };
- CheckPlan(plan);
- RunPlan(plan, client);
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 1);
- }
-
- Y_UNIT_TEST(CustomPlan) {
- TDefaultTestSetup setup(1);
- auto& client = setup.GetClient();
-
- TPlan plan{
- {EAction::CreateFuture, 1}
- };
- CheckPlan(plan);
- RunPlan(plan, client);
-
- std::this_thread::sleep_for(std::chrono::milliseconds(10000));
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- }
-
- ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
- std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
- std::vector<NYdb::NTable::TCreateSessionResult> sessions;
- std::mt19937 rng(0);
- ui32 successCount = 0;
-
- for (ui32 i = 0; i < activeSessionsLimit * 12; ++i) {
- sessionFutures.emplace_back(client.GetSession());
- }
-
- for (ui32 i = 0; i < n; ++i) {
- switch (static_cast<EAction>(GetRand(rng, 0, 2))) {
- case EAction::CreateFuture: {
- sessionFutures.emplace_back(client.GetSession());
- break;
- }
- case EAction::ExtractValue: {
- if (sessionFutures.empty()) {
- break;
- }
- auto ind = GetRand(rng, 0, sessionFutures.size() - 1);
- auto sessionFuture = sessionFutures[ind];
- if (sessionFuture.HasValue()) {
- auto session = sessionFuture.ExtractValueSync();
- if (session.IsSuccess()) {
- ++successCount;
- }
- sessions.emplace_back(std::move(session));
- sessionFutures.erase(sessionFutures.begin() + ind);
- break;
- }
- break;
- }
- case EAction::Return: {
- if (sessions.empty()) {
- break;
- }
- auto ind = GetRand(rng, 0, sessions.size() - 1);
- sessions.erase(sessions.begin() + ind);
- break;
- }
- }
- }
- return successCount;
- }
-
- Y_UNIT_TEST(StressTestSync1) {
- ui32 activeSessionsLimit = 1;
-
- TDefaultTestSetup setup(activeSessionsLimit);
- auto& client = setup.GetClient();
-
- RunStressTestSync(1000, activeSessionsLimit, client);
-
- std::this_thread::sleep_for(std::chrono::milliseconds(10000));
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
- }
-
- Y_UNIT_TEST(StressTestSync10) {
- ui32 activeSessionsLimit = 10;
-
- TDefaultTestSetup setup(activeSessionsLimit);
- auto& client = setup.GetClient();
-
- RunStressTestSync(1000, activeSessionsLimit, client);
-
- std::this_thread::sleep_for(std::chrono::milliseconds(10000));
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
- }
-
- ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& client) {
- std::atomic<ui32> successCount(0);
- std::atomic<ui32> jobIndex(0);
-
- auto job = [&client, &successCount, &jobIndex, n]() mutable {
- std::mt19937 rng(++jobIndex);
- for (ui32 i = 0; i < n; ++i) {
- std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
- auto sessionFuture = client.GetSession();
- std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
- auto session = sessionFuture.ExtractValueSync();
- std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
- successCount += session.IsSuccess();
- }
- };
-
- IThreadFactory* pool = SystemThreadFactory();
- TVector<TAutoPtr<IThreadFactory::IThread>> threads;
- threads.resize(nThreads);
- for (ui32 i = 0; i < nThreads; i++) {
- threads[i] = pool->Run(job);
- }
- for (ui32 i = 0; i < nThreads; i++) {
- threads[i]->Join();
- }
-
- return successCount;
- }
-
- Y_UNIT_TEST(StressTestAsync1) {
- ui32 activeSessionsLimit = 1;
-
- TDefaultTestSetup setup(activeSessionsLimit);
- auto& client = setup.GetClient();
-
- RunStressTestAsync(100, 10, client);
-
- std::this_thread::sleep_for(std::chrono::milliseconds(10000));
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
- }
-
- Y_UNIT_TEST(StressTestAsync10) {
- ui32 activeSessionsLimit = 10;
-
- TDefaultTestSetup setup(activeSessionsLimit);
- auto& client = setup.GetClient();
-
- RunStressTestAsync(1000, 10, client);
-
- std::this_thread::sleep_for(std::chrono::milliseconds(10000));
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
- }
-
- void TestPeriodicTask(ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
- std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
- std::vector<NYdb::NTable::TCreateSessionResult> sessions;
-
- for (ui32 i = 0; i < activeSessionsLimit; ++i) {
- sessions.emplace_back(client.GetSession().ExtractValueSync());
- UNIT_ASSERT_VALUES_EQUAL(sessions.back().IsSuccess(), true);
- }
-
- for (ui32 i = 0; i < activeSessionsLimit; ++i) {
- sessionFutures.emplace_back(client.GetSession());
- }
-
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
-
- for (auto& sessionFuture : sessionFutures) {
- UNIT_ASSERT(!sessionFuture.HasValue());
- }
-
- // Wait for wait session timeout
- std::this_thread::sleep_for(std::chrono::milliseconds(10000));
-
- for (auto& sessionFuture : sessionFutures) {
- UNIT_ASSERT(sessionFuture.HasValue());
- UNIT_ASSERT(!sessionFuture.ExtractValueSync().IsSuccess());
- }
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit);
-
- sessionFutures.clear();
- sessions.clear();
-
- std::this_thread::sleep_for(std::chrono::milliseconds(10000));
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit);
- }
-
- Y_UNIT_TEST(PeriodicTask1) {
- ui32 activeSessionsLimit = 1;
-
- TDefaultTestSetup setup(activeSessionsLimit);
- auto& client = setup.GetClient();
-
- TestPeriodicTask(activeSessionsLimit, client);
- }
-
- Y_UNIT_TEST(PeriodicTask10) {
- ui32 activeSessionsLimit = 10;
-
- TDefaultTestSetup setup(activeSessionsLimit);
- auto& client = setup.GetClient();
-
- TestPeriodicTask(activeSessionsLimit, client);
- }
-
- Y_UNIT_TEST(FailTest) {
- // This test reproduces bug from KIKIMR-18063
- TDefaultTestSetup setup(1);
- auto& client = setup.GetClient();
-
- auto sessionFromPool = client.GetSession().ExtractValueSync();
- auto futureInWaitPool = client.GetSession();
-
- {
- auto standaloneSessionThatWillBeBroken = client.CreateSession().ExtractValueSync();
- auto res = standaloneSessionThatWillBeBroken.GetSession().ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync();
- }
- }
-}
diff --git a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
deleted file mode 100644
index 85312403144..00000000000
--- a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
+++ /dev/null
@@ -1,831 +0,0 @@
-#include "ydb_common_ut.h"
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h>
-
-#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
-
-#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
-
-using namespace NYdb;
-using namespace NYdb::NTable;
-
-namespace {
-
-void CreateTestTable(NYdb::TDriver& driver) {
- NYdb::NTable::TTableClient client(driver);
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT(sessionResponse.IsSuccess());
- auto session = sessionResponse.GetSession();
- auto result = session.ExecuteSchemeQuery(R"___(
- CREATE TABLE `Root/Test` (
- Key Uint32,
- Value String,
- PRIMARY KEY (Key)
- );
- )___").ExtractValueSync();
- UNIT_ASSERT(result.IsSuccess());
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
-}
-
-TString WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client) {
- TString sessionId;
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT(sessionResponse.IsSuccess());
- auto session = sessionResponse.GetSession();
- sessionId = session.GetId();
- auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
-
- UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
-
- TResultSetParser resultSet(res.GetResultSetParser(0));
- UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
-
- return sessionId;
-}
-
-void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, i64 expected) {
- int attempt = 10;
- while (attempt--) {
- if (client.GetCurrentPoolSize() == expected)
- break;
- Sleep(TDuration::MilliSeconds(100));
- }
- UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), expected);
-}
-
-}
-
-Y_UNIT_TEST_SUITE(YdbSdkSessions) {
- Y_UNIT_TEST(TestSessionPool) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- const TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- int count = 10;
-
- THashSet<std::string> sids;
- while (count--) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
- auto session = sessionResponse.GetSession();
- sids.insert(session.GetId());
- auto result = session.ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
-
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(result.GetEndpoint(), location);
- }
- // All requests used one session
- UNIT_ASSERT_VALUES_EQUAL(sids.size(), 1);
- // No more session captured by client
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
-
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(TestMultipleSessions) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- int count = 10;
-
- TVector<TSession> sids;
- TVector<TAsyncDataQueryResult> results;
- while (count--) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
- auto session = sessionResponse.GetSession();
- sids.push_back(session);
- results.push_back(session.ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
- }
-
- NThreading::WaitExceptionOrAll(results).Wait();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10);
-
- for (auto& result : results) {
- UNIT_ASSERT_EQUAL(result.GetValue().GetStatus(), EStatus::SUCCESS);
- }
- sids.clear();
- results.clear();
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
-
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(TestActiveSessionCountAfterBadSession) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- int count = 10;
-
- TVector<TSession> sids;
- TVector<TAsyncDataQueryResult> results;
- while (count--) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
- auto session = sessionResponse.GetSession();
- sids.push_back(session);
- if (count == 0) {
- // Force BAD session server response for ExecuteDataQuery
- UNIT_ASSERT_EQUAL(session.Close().GetValueSync().GetStatus(), EStatus::SUCCESS);
- results.push_back(session.ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
- } else {
- results.push_back(session.ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
- }
- }
-
- NThreading::WaitExceptionOrAll(results).Wait();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10);
-
- for (size_t i = 0; i < results.size(); i++) {
- if (i == 9) {
- UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::BAD_SESSION);
- } else {
- UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::SUCCESS);
- }
- }
- sids.clear();
- results.clear();
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
-
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryService) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
- auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- CreateTestTable(driver);
-
- NYdb::NQuery::TQueryClient client(driver);
- TString sessionId = WarmPoolCreateSession(client);
- WaitForSessionsInPool(client, 1);
-
- bool allDoneOk = true;
- NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
- UNIT_ASSERT(allDoneOk);
-
- {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT(sessionResponse.IsSuccess());
- auto session = sessionResponse.GetSession();
- UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId);
-
- auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
-
- UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString());
- }
-
- WaitForSessionsInPool(client, 0);
-
- {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT(sessionResponse.IsSuccess());
- auto session = sessionResponse.GetSession();
- UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId);
- auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
-
- UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
- }
-
- WaitForSessionsInPool(client, 1);
-
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
- auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- CreateTestTable(driver);
-
- NYdb::NQuery::TQueryClient client(driver);
- TString sessionId = WarmPoolCreateSession(client);
- WaitForSessionsInPool(client, 1);
-
- bool allDoneOk = true;
- NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
- UNIT_ASSERT(allDoneOk);
-
- {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT(sessionResponse.IsSuccess());
- auto session = sessionResponse.GetSession();
- UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId);
-
- auto it = session.StreamExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
-
- UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
-
- auto res = it.ReadNext().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString());
- }
-
- WaitForSessionsInPool(client, 0);
-
- {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT(sessionResponse.IsSuccess());
- auto session = sessionResponse.GetSession();
- UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId);
-
- auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
-
- UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
- }
-
- WaitForSessionsInPool(client, 1);
-
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- int count = 100;
-
- {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT(sessionResponse.IsSuccess());
- auto session = sessionResponse.GetSession();
- auto result = session.ExecuteSchemeQuery(R"___(
- CREATE TABLE `Root/Test` (
- Key Uint32,
- Value String,
- PRIMARY KEY (Key)
- );
- )___").ExtractValueSync();
- UNIT_ASSERT(result.IsSuccess());
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- }
-
- while (count--) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
- auto session = sessionResponse.GetSession();
-
- // Assume 10us is too small to execute query and get response
- auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(MultiThreadSync) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- const int nThreads = 10;
- const int nRequests = 1000;
- auto job = [client]() mutable {
- for (int i = 0; i < nRequests; i++) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.GetStatus(), EStatus::SUCCESS);
- }
- };
- IThreadFactory* pool = SystemThreadFactory();
-
- TVector<TAutoPtr<IThreadFactory::IThread>> threads;
- threads.resize(nThreads);
- for (int i = 0; i < nThreads; i++) {
- threads[i] = pool->Run(job);
- }
- for (int i = 0; i < nThreads; i++) {
- threads[i]->Join();
- }
- UNIT_ASSERT_EQUAL(client.GetActiveSessionCount(), 0);
- driver.Stop(true);
- }
-
- void EnsureCanExecQuery(NYdb::NTable::TSession session) {
- auto execStatus = session.ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus();
- UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS);
- }
-
- void EnsureCanExecQuery(NYdb::NQuery::TSession session) {
- auto execStatus = session.ExecuteQuery("SELECT 42;",
- NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync().GetStatus();
- UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS);
- }
-
- template<typename TClient>
- void DoMultiThreadSessionPoolLimitSync() {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- const int maxActiveSessions = 45;
- TClient client(driver,
- typename TClient::TSettings()
- .SessionPoolSettings(
- typename TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
-
- constexpr int nThreads = 100;
- NYdb::EStatus statuses[nThreads];
- TVector<TMaybe<typename TClient::TSession>> sessions;
- sessions.resize(nThreads);
- TAtomic t = 0;
- auto job = [client, &t, &statuses, &sessions]() mutable {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- int i = AtomicIncrement(t);
- statuses[--i] = sessionResponse.GetStatus();
- if (statuses[i] == EStatus::SUCCESS) {
- EnsureCanExecQuery(sessionResponse.GetSession());
- sessions[i] = sessionResponse.GetSession();
- }
- };
- IThreadFactory* pool = SystemThreadFactory();
-
- TVector<TAutoPtr<IThreadFactory::IThread>> threads;
- threads.resize(nThreads);
- for (int i = 0; i < nThreads; i++) {
- threads[i] = pool->Run(job);
- }
- for (int i = 0; i < nThreads; i++) {
- threads[i]->Join();
- }
-
- sessions.clear();
-
- int successCount = 0;
- int exhaustedCount = 0;
- for (int i = 0; i < nThreads; i++) {
- switch (statuses[i]) {
- case EStatus::SUCCESS:
- successCount++;
- break;
- case EStatus::CLIENT_RESOURCE_EXHAUSTED:
- exhaustedCount++;
- break;
- default:
- UNIT_ASSERT(false);
- }
- }
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(successCount, maxActiveSessions);
- UNIT_ASSERT_VALUES_EQUAL(exhaustedCount, nThreads - maxActiveSessions);
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(MultiThreadSessionPoolLimitSyncTableClient) {
- DoMultiThreadSessionPoolLimitSync<NYdb::NTable::TTableClient>();
- }
-
- Y_UNIT_TEST(MultiThreadSessionPoolLimitSyncQueryClient) {
- DoMultiThreadSessionPoolLimitSync<NYdb::NQuery::TQueryClient>();
- }
-
- NYdb::NTable::TAsyncDataQueryResult ExecQueryAsync(NYdb::NTable::TSession session, const TString q) {
- return session.ExecuteDataQuery(q,
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx());
- }
-
- NYdb::NQuery::TAsyncExecuteQueryResult ExecQueryAsync(NYdb::NQuery::TSession session, const TString q) {
- return session.ExecuteQuery(q,
- NYdb::NQuery::TTxControl::BeginTx().CommitTx());
- }
-
- template<typename T>
- void DoMultiThreadMultipleRequestsOnSharedSessions() {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- const int maxActiveSessions = 10;
- typename T::TClient client(driver,
- typename T::TClient::TSettings()
- .SessionPoolSettings(
- typename T::TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
-
- constexpr int nThreads = 20;
- constexpr int nRequests = 50;
- std::array<TVector<typename T::TResult>, nThreads> results;
- TAtomic t = 0;
- TAtomic validSessions = 0;
- auto job = [client, &t, &results, &validSessions]() mutable {
- auto sessionResponse = client.GetSession().ExtractValueSync();
-
- int i = AtomicIncrement(t);
- TVector<typename T::TResult>& r = results[--i];
-
- if (sessionResponse.GetStatus() != EStatus::SUCCESS) {
- return;
- }
- AtomicIncrement(validSessions);
-
- for (int i = 0; i < nRequests; i++) {
- r.push_back(ExecQueryAsync(sessionResponse.GetSession(), "SELECT 42;"));
- }
- };
- IThreadFactory* pool = SystemThreadFactory();
-
- TVector<TAutoPtr<IThreadFactory::IThread>> threads;
- threads.resize(nThreads);
- for (int i = 0; i < nThreads; i++) {
- threads[i] = pool->Run(job);
- }
- for (int i = 0; i < nThreads; i++) {
- threads[i]->Join();
- }
-
- for (auto& r : results) {
- NThreading::WaitExceptionOrAll(r).Wait();
- }
- for (auto& r : results) {
- if (!r.empty()) {
- for (auto& asyncStatus : r) {
- auto res = asyncStatus.GetValue();
- if (!res.IsSuccess()) {
- UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::SESSION_BUSY);
- }
- }
- }
- }
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), maxActiveSessions);
- auto curExpectedActive = maxActiveSessions;
- auto empty = 0;
- for (auto& r : results) {
- if (!r.empty()) {
- r.clear();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), --curExpectedActive);
- } else {
- empty++;
- }
- }
- UNIT_ASSERT_VALUES_EQUAL(empty, nThreads - maxActiveSessions);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessionsTableClient) {
- struct TTypeHelper {
- using TClient = NYdb::NTable::TTableClient;
- using TResult = NYdb::NTable::TAsyncDataQueryResult;
- };
- DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>();
- }
-
- // Enable after interactive tx support
- //Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessionsQueryClient) {
- // struct TTypeHelper {
- // using TClient = NYdb::NQuery::TQueryClient;
- // using TResult = NYdb::NQuery::TAsyncExecuteQueryResult;
- // };
- // DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>();
- //}
-
- Y_UNIT_TEST(SessionsServerLimit) {
- NKikimrConfig::TAppConfig appConfig;
- auto& tableServiceConfig = *appConfig.MutableTableServiceConfig();
- tableServiceConfig.SetSessionsLimitPerNode(2);
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- auto sessionResult = client.CreateSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
-
- sessionResult = client.CreateSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session2 = sessionResult.GetSession();
-
- sessionResult = client.CreateSession().ExtractValueSync();
- sessionResult.GetIssues().PrintTo(Cerr);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED);
-
- auto status = session1.Close().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS);
-
- auto result = session2.ExecuteDataQuery(R"___(
- SELECT 1;
- )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
-
- sessionResult = client.CreateSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
-
- sessionResult = client.CreateSession().ExtractValueSync();
- sessionResult.GetIssues().PrintTo(Cerr);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- }
-
- Y_UNIT_TEST(SessionsServerLimitWithSessionPool) {
- NKikimrConfig::TAppConfig appConfig;
- auto& tableServiceConfig = *appConfig.MutableTableServiceConfig();
- tableServiceConfig.SetSessionsLimitPerNode(2);
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- auto sessionResult1 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- auto session1 = sessionResult1.GetSession();
-
- auto sessionResult2 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult2.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
- auto session2 = sessionResult2.GetSession();
-
- {
- auto sessionResult3 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3);
- }
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
-
- auto status = session1.Close().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS);
-
- // Close doesnt free session from user perspective,
- // the value of ActiveSessionsCounter will be same after Close() call.
- // Probably we want to chenge this contract
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
-
- auto result = session2.ExecuteDataQuery(R"___(
- SELECT 1;
- )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
-
- sessionResult1 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetSession().GetId().empty(), false);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3);
-
- auto sessionResult3 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4);
-
- auto tmp = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 5);
- sessionResult1 = tmp; // here we reset previous created session object,
- // so perform close rpc call implicitly and delete it
- UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::OVERLOADED);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4);
- }
-
- Y_UNIT_TEST(CloseSessionAfterDriverDtorWithoutSessionPool) {
- NKikimrConfig::TAppConfig appConfig;
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- std::vector<std::string> sessionIds;
- int iterations = 50;
-
- while (iterations--) {
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- auto sessionResult = client.CreateSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
- sessionIds.push_back(session1.GetId());
- }
-
- std::shared_ptr<grpc::Channel> channel;
- channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
- auto stub = Ydb::Table::V1::TableService::NewStub(channel);
- for (const auto& sessionId : sessionIds) {
- grpc::ClientContext context;
- Ydb::Table::KeepAliveRequest request;
- request.set_session_id(TStringType{sessionId});
- Ydb::Table::KeepAliveResponse response;
- auto status = stub->KeepAlive(&context, request, &response);
- UNIT_ASSERT(status.ok());
- auto deferred = response.operation();
- UNIT_ASSERT(deferred.ready() == true);
- UNIT_ASSERT_VALUES_EQUAL(deferred.status(), Ydb::StatusIds::BAD_SESSION);
- }
- }
-
- Y_UNIT_TEST(CloseSessionWithSessionPoolExplicit) {
- NKikimrConfig::TAppConfig appConfig;
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- std::vector<std::string> sessionIds;
- int iterations = 100;
-
- while (iterations--) {
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- //TODO: remove this scope after session tracker implementation
- {
- auto sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
- sessionIds.push_back(session1.GetId());
-
- sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- // Here previous created session will be returnet to session pool
- session1 = sessionResult.GetSession();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- sessionIds.push_back(session1.GetId());
- }
-
- if (RandomNumber<ui32>(10) == 5) {
- client.Stop().Apply([client](NThreading::TFuture<void> future){
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- return future;
- }).Wait();
- } else {
- client.Stop().Wait();
- }
-
- if (iterations & 4) {
- driver.Stop(true);
- }
- }
-
- std::shared_ptr<grpc::Channel> channel;
- channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
- auto stub = Ydb::Table::V1::TableService::NewStub(channel);
- for (const auto& sessionId : sessionIds) {
- grpc::ClientContext context;
- Ydb::Table::KeepAliveRequest request;
- request.set_session_id(TStringType{sessionId});
- Ydb::Table::KeepAliveResponse response;
- auto status = stub->KeepAlive(&context, request, &response);
- UNIT_ASSERT(status.ok());
- auto deferred = response.operation();
- UNIT_ASSERT(deferred.ready() == true);
- UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION);
- }
- }
-
- Y_UNIT_TEST(CloseSessionWithSessionPoolExplicitDriverStopOnly) {
- NKikimrConfig::TAppConfig appConfig;
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- std::vector<std::string> sessionIds;
- int iterations = 100;
-
- while (iterations--) {
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- //TODO: remove this scope after session tracker implementation
- {
- auto sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
- sessionIds.push_back(session1.GetId());
-
- sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- // Here previous created session will be returnet to session pool
- session1 = sessionResult.GetSession();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- sessionIds.push_back(session1.GetId());
- }
- driver.Stop(true);
- }
-
- std::shared_ptr<grpc::Channel> channel;
- channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
- auto stub = Ydb::Table::V1::TableService::NewStub(channel);
- for (const auto& sessionId : sessionIds) {
- grpc::ClientContext context;
- Ydb::Table::KeepAliveRequest request;
- request.set_session_id(TStringType{sessionId});
- Ydb::Table::KeepAliveResponse response;
- auto status = stub->KeepAlive(&context, request, &response);
- UNIT_ASSERT(status.ok());
- auto deferred = response.operation();
- UNIT_ASSERT(deferred.ready() == true);
- UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION);
- }
- }
-
- Y_UNIT_TEST(CloseSessionWithSessionPoolFromDtors) {
- NKikimrConfig::TAppConfig appConfig;
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- std::vector<std::string> sessionIds;
- int iterations = 100;
-
- while (iterations--) {
- NYdb::TDriver driver(TDriverConfig()
- .SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- //TODO: remove this scope after session tracker implementation
- {
- auto sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
- sessionIds.push_back(session1.GetId());
-
- sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- // Here previous created session will be returnet to session pool
- session1 = sessionResult.GetSession();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- sessionIds.push_back(session1.GetId());
- }
- }
-
- std::shared_ptr<grpc::Channel> channel;
- channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
- auto stub = Ydb::Table::V1::TableService::NewStub(channel);
- for (const auto& sessionId : sessionIds) {
- grpc::ClientContext context;
- Ydb::Table::KeepAliveRequest request;
- request.set_session_id(TStringType{sessionId});
- Ydb::Table::KeepAliveResponse response;
- auto status = stub->KeepAlive(&context, request, &response);
- UNIT_ASSERT(status.ok());
- auto deferred = response.operation();
- UNIT_ASSERT(deferred.ready() == true);
- UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION);
- }
- }
-}
diff --git a/ydb/services/ydb/ya.make b/ydb/services/ydb/ya.make
index 0dbb567a899..3c83efe6674 100644
--- a/ydb/services/ydb/ya.make
+++ b/ydb/services/ydb/ya.make
@@ -39,8 +39,6 @@ END()
RECURSE_FOR_TESTS(
backup_ut
- sdk_sessions_ut
- sdk_sessions_pool_ut
table_split_ut
ut
)