diff options
author | Bulat <bulat@ydb.tech> | 2025-04-22 12:47:19 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-22 12:47:19 +0300 |
commit | 24a3abb1a338a5293e7b45a411f9f17e48c144d0 (patch) | |
tree | 9ee31d442206ee13c747148a7947ec7485ea080e | |
parent | e040fdf1cb9d749133e6ecfb3d61cc1ace3e0f40 (diff) | |
download | ydb-24a3abb1a338a5293e7b45a411f9f17e48c144d0.tar.gz |
Moved sessions tests to C++ SDK repo (#17512)
-rw-r--r-- | ydb/public/sdk/cpp/tests/integration/sessions/main.cpp | 802 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/tests/integration/sessions/ya.make (renamed from ydb/services/ydb/sdk_sessions_ut/ya.make) | 9 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp | 367 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make (renamed from ydb/services/ydb/sdk_sessions_pool_ut/ya.make) | 7 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/tests/integration/ya.make | 2 | ||||
-rw-r--r-- | ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp | 448 | ||||
-rw-r--r-- | ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp | 831 | ||||
-rw-r--r-- | ydb/services/ydb/ya.make | 2 |
8 files changed, 1178 insertions, 1290 deletions
diff --git a/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp b/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp new file mode 100644 index 00000000000..6a1af34fb27 --- /dev/null +++ b/ydb/public/sdk/cpp/tests/integration/sessions/main.cpp @@ -0,0 +1,802 @@ +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h> +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h> + +#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> +#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h> + +#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <grpcpp/security/credentials.h> +#include <grpcpp/create_channel.h> + +#include <random> +#include <thread> + +using namespace NYdb; +using namespace NYdb::NTable; + +namespace { + +void CreateTestTable(NYdb::TDriver& driver) { + NYdb::NTable::TTableClient client(driver); + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()); + auto session = sessionResult.GetSession(); + auto result = session.ExecuteSchemeQuery(R"___( + CREATE TABLE `/local/t` ( + Key Uint32, + Value String, + PRIMARY KEY (Key) + ); + )___").ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); + ASSERT_EQ(client.GetActiveSessionCount(), 1); +} + +void WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client, std::string& sessionId) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + sessionId = session.GetId(); + auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString(); + + TResultSetParser resultSet(res.GetResultSetParser(0)); + ASSERT_EQ(resultSet.ColumnsCount(), 2u); +} + +void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, std::int64_t expected) { + int attempt = 10; + while (attempt--) { + if (client.GetCurrentPoolSize() == expected) + break; + Sleep(TDuration::MilliSeconds(100)); + } + ASSERT_EQ(client.GetCurrentPoolSize(), expected); +} + +} + +void CheckDelete(const NYdbGrpc::TGRpcClientConfig& clientConfig, const std::string& id, int expected, bool& allDoneOk) { + NYdbGrpc::TGRpcClientLow clientLow; + auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig); + + Ydb::Query::DeleteSessionRequest request; + request.set_session_id(id); + + NYdbGrpc::TResponseCallback<Ydb::Query::DeleteSessionResponse> responseCb = + [&allDoneOk, expected](NYdbGrpc::TGrpcStatus&& grpcStatus, Ydb::Query::DeleteSessionResponse&& response) -> void { + ASSERT_FALSE(grpcStatus.InternalError); + ASSERT_EQ(grpcStatus.GRpcStatusCode, 0) << grpcStatus.Msg + " " + grpcStatus.Details; + allDoneOk &= (response.status() == expected); + if (!allDoneOk) { + std::cerr << "Expected status: " << expected << ", got response: " << response.DebugString() << std::endl; + } + }; + + connection->DoRequest(request, std::move(responseCb), &Ydb::Query::V1::QueryService::Stub::AsyncDeleteSession); +} + +TEST(YdbSdkSessions, TestSessionPool) { + const std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 10; + + std::unordered_set<std::string> sids; + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(sessionResponse.IsTransportError()); + auto session = sessionResponse.GetSession(); + sids.insert(session.GetId()); + auto result = session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(result.GetEndpoint(), location); + } + // All requests used one session + ASSERT_EQ(sids.size(), 1u); + // No more session captured by client + ASSERT_EQ(client.GetActiveSessionCount(), 0u); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestMultipleSessions) { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 10; + + std::vector<TSession> sids; + std::vector<TAsyncDataQueryResult> results; + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(sessionResponse.IsTransportError()); + auto session = sessionResponse.GetSession(); + sids.push_back(session); + results.push_back(session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } + + NThreading::WaitExceptionOrAll(results).Wait(); + ASSERT_EQ(client.GetActiveSessionCount(), 10); + + for (auto& result : results) { + ASSERT_EQ(result.GetValue().GetStatus(), EStatus::SUCCESS); + } + sids.clear(); + results.clear(); + + ASSERT_EQ(client.GetActiveSessionCount(), 0); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestActiveSessionCountAfterBadSession) { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 10; + + std::vector<TSession> sids; + std::vector<TAsyncDataQueryResult> results; + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(sessionResponse.IsTransportError()); + auto session = sessionResponse.GetSession(); + sids.push_back(session); + if (count == 0) { + // Force BAD session server response for ExecuteDataQuery + ASSERT_EQ(session.Close().GetValueSync().GetStatus(), EStatus::SUCCESS); + results.push_back(session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } else { + results.push_back(session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } + } + + NThreading::WaitExceptionOrAll(results).Wait(); + ASSERT_EQ(client.GetActiveSessionCount(), 10); + + for (size_t i = 0; i < results.size(); i++) { + if (i == 9) { + ASSERT_EQ(results[i].GetValue().GetStatus(), EStatus::BAD_SESSION); + } else { + ASSERT_EQ(results[i].GetValue().GetStatus(), EStatus::SUCCESS); + } + } + sids.clear(); + results.clear(); + + ASSERT_EQ(client.GetActiveSessionCount(), 0); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryService) { + GTEST_SKIP() << "Test is failing right now"; + std::string location = std::getenv("YDB_ENDPOINT"); + auto clientConfig = NYdbGrpc::TGRpcClientConfig(location); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + CreateTestTable(driver); + + NYdb::NQuery::TQueryClient client(driver); + std::string sessionId; + WarmPoolCreateSession(client, sessionId); + WaitForSessionsInPool(client, 1); + + bool allDoneOk = true; + CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk); + ASSERT_TRUE(allDoneOk); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + ASSERT_EQ(session.GetId(), sessionId); + + auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + ASSERT_EQ(res.GetStatus(), EStatus::BAD_SESSION) << res.GetIssues().ToString(); + } + + WaitForSessionsInPool(client, 0); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + ASSERT_NE(session.GetId(), sessionId); + auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString(); + } + + WaitForSessionsInPool(client, 1); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) { + GTEST_SKIP() << "Test is failing right now"; + std::string location = std::getenv("YDB_ENDPOINT"); + auto clientConfig = NYdbGrpc::TGRpcClientConfig(location); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + CreateTestTable(driver); + + NYdb::NQuery::TQueryClient client(driver); + std::string sessionId; + WarmPoolCreateSession(client, sessionId); + WaitForSessionsInPool(client, 1); + + bool allDoneOk = true; + CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk); + ASSERT_TRUE(allDoneOk); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + ASSERT_EQ(session.GetId(), sessionId); + + auto it = session.StreamExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + ASSERT_EQ(it.GetStatus(), EStatus::SUCCESS) << it.GetIssues().ToString(); + + auto res = it.ReadNext().GetValueSync(); + ASSERT_EQ(res.GetStatus(), EStatus::BAD_SESSION) << res.GetIssues().ToString(); + } + + WaitForSessionsInPool(client, 0); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + ASSERT_NE(session.GetId(), sessionId); + + auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString(); + } + + WaitForSessionsInPool(client, 1); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestActiveSessionCountAfterTransportError) { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 100; + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + auto result = session.ExecuteSchemeQuery(R"___( + CREATE TABLE `/local/t` ( + Key Uint32, + Value String, + PRIMARY KEY (Key) + ); + )___").ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + } + + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(sessionResponse.IsTransportError()); + auto session = sessionResponse.GetSession(); + + // Assume 10us is too small to execute query and get response + auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `/local/t`;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + } + + ASSERT_EQ(client.GetActiveSessionCount(), 0); + driver.Stop(true); +} + +TEST(YdbSdkSessions, MultiThreadSync) { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + const int nThreads = 10; + const int nRequests = 1000; + auto job = [client]() mutable { + for (int i = 0; i < nRequests; i++) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResponse.GetStatus(), EStatus::SUCCESS); + } + }; + std::vector<std::thread> threads; + for (int i = 0; i < nThreads; i++) { + threads.emplace_back(job); + } + for (auto& thread : threads) { + thread.join(); + } + ASSERT_EQ(client.GetActiveSessionCount(), 0); + driver.Stop(true); +} + +void EnsureCanExecQuery(NYdb::NTable::TSession session) { + auto execStatus = session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus(); + ASSERT_EQ(execStatus, EStatus::SUCCESS); +} + +void EnsureCanExecQuery(NYdb::NQuery::TSession session) { + auto execStatus = session.ExecuteQuery("SELECT 42;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync().GetStatus(); + ASSERT_EQ(execStatus, EStatus::SUCCESS); +} + +template<typename TClient> +void DoMultiThreadSessionPoolLimitSync() { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + const int maxActiveSessions = 45; + TClient client(driver, + typename TClient::TSettings() + .SessionPoolSettings( + typename TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); + + constexpr int nThreads = 100; + NYdb::EStatus statuses[nThreads]; + std::vector<std::optional<typename TClient::TSession>> sessions; + sessions.resize(nThreads); + std::atomic<int> t = 0; + auto job = [client, &t, &statuses, &sessions]() mutable { + auto sessionResponse = client.GetSession().ExtractValueSync(); + int i = ++t; + statuses[--i] = sessionResponse.GetStatus(); + if (statuses[i] == EStatus::SUCCESS) { + EnsureCanExecQuery(sessionResponse.GetSession()); + sessions[i] = sessionResponse.GetSession(); + } + }; + + std::vector<std::thread> threads; + threads.resize(nThreads); + for (int i = 0; i < nThreads; i++) { + threads[i] = std::thread(job); + } + for (int i = 0; i < nThreads; i++) { + threads[i].join(); + } + + sessions.clear(); + + int successCount = 0; + int exhaustedCount = 0; + for (int i = 0; i < nThreads; i++) { + switch (statuses[i]) { + case EStatus::SUCCESS: + successCount++; + break; + case EStatus::CLIENT_RESOURCE_EXHAUSTED: + exhaustedCount++; + break; + default: + FAIL() << "Unexpected status code: " << static_cast<size_t>(statuses[i]); + } + } + + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(successCount, maxActiveSessions); + ASSERT_EQ(exhaustedCount, nThreads - maxActiveSessions); + driver.Stop(true); +} + +TEST(YdbSdkSessions, MultiThreadSessionPoolLimitSyncTableClient) { + DoMultiThreadSessionPoolLimitSync<NYdb::NTable::TTableClient>(); +} + +TEST(YdbSdkSessions, MultiThreadSessionPoolLimitSyncQueryClient) { + DoMultiThreadSessionPoolLimitSync<NYdb::NQuery::TQueryClient>(); +} + +NYdb::NTable::TAsyncDataQueryResult ExecQueryAsync(NYdb::NTable::TSession session, const std::string q) { + return session.ExecuteDataQuery(q, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()); +} + +NYdb::NQuery::TAsyncExecuteQueryResult ExecQueryAsync(NYdb::NQuery::TSession session, const std::string q) { + return session.ExecuteQuery(q, + NYdb::NQuery::TTxControl::BeginTx().CommitTx()); +} + +template<typename T> +void DoMultiThreadMultipleRequestsOnSharedSessions() { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + const int maxActiveSessions = 10; + typename T::TClient client(driver, + typename T::TClient::TSettings() + .SessionPoolSettings( + typename T::TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); + + constexpr int nThreads = 20; + constexpr int nRequests = 50; + std::array<std::vector<typename T::TResult>, nThreads> results; + std::atomic<int> t = 0; + std::atomic<int> validSessions = 0; + auto job = [client, &t, &results, &validSessions]() mutable { + auto sessionResponse = client.GetSession().ExtractValueSync(); + + int i = ++t; + std::vector<typename T::TResult>& r = results[--i]; + + if (sessionResponse.GetStatus() != EStatus::SUCCESS) { + return; + } + validSessions.fetch_add(1); + + for (int i = 0; i < nRequests; i++) { + r.push_back(ExecQueryAsync(sessionResponse.GetSession(), "SELECT 42;")); + } + }; + + std::vector<std::thread> threads; + for (int i = 0; i < nThreads; i++) { + threads.emplace_back(job); + } + for (auto& thread : threads) { + thread.join(); + } + + for (auto& r : results) { + NThreading::WaitExceptionOrAll(r).Wait(); + } + for (auto& r : results) { + if (!r.empty()) { + for (auto& asyncStatus : r) { + auto res = asyncStatus.GetValue(); + if (!res.IsSuccess()) { + ASSERT_EQ(res.GetStatus(), EStatus::SESSION_BUSY); + } + } + } + } + ASSERT_EQ(client.GetActiveSessionCount(), maxActiveSessions); + auto curExpectedActive = maxActiveSessions; + auto empty = 0; + for (auto& r : results) { + if (!r.empty()) { + r.clear(); + ASSERT_EQ(client.GetActiveSessionCount(), --curExpectedActive); + } else { + empty++; + } + } + ASSERT_EQ(empty, nThreads - maxActiveSessions); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + driver.Stop(true); +} + +TEST(YdbSdkSessions, MultiThreadMultipleRequestsOnSharedSessionsTableClient) { + struct TTypeHelper { + using TClient = NYdb::NTable::TTableClient; + using TResult = NYdb::NTable::TAsyncDataQueryResult; + }; + DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>(); +} + +TEST(YdbSdkSessions, MultiThreadMultipleRequestsOnSharedSessionsQueryClient) { + GTEST_SKIP() << "Enable after interactive tx support"; + struct TTypeHelper { + using TClient = NYdb::NQuery::TQueryClient; + using TResult = NYdb::NQuery::TAsyncExecuteQueryResult; + }; + DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>(); +} + +TEST(YdbSdkSessions, SessionsServerLimit) { + GTEST_SKIP() << "Enable after accepting a pull request with merging configs"; + + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + auto sessionResult = client.CreateSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + + sessionResult = client.CreateSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session2 = sessionResult.GetSession(); + + sessionResult = client.CreateSession().ExtractValueSync(); + sessionResult.GetIssues().PrintTo(Cerr); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::OVERLOADED); + + auto status = session1.Close().ExtractValueSync(); + ASSERT_EQ(status.IsTransportError(), false); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(status.GetStatus(), EStatus::SUCCESS); + + auto result = session2.ExecuteDataQuery(R"___( + SELECT 1; + )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS); + + sessionResult = client.CreateSession().ExtractValueSync(); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + + sessionResult = client.CreateSession().ExtractValueSync(); + sessionResult.GetIssues().PrintTo(Cerr); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::OVERLOADED); + ASSERT_EQ(client.GetActiveSessionCount(), 0); +} + +TEST(YdbSdkSessions, SessionsServerLimitWithSessionPool) { + GTEST_SKIP() << "Enable after accepting a pull request with merging configs"; + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + auto sessionResult1 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult1.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + auto session1 = sessionResult1.GetSession(); + + auto sessionResult2 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult2.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(client.GetActiveSessionCount(), 2); + auto session2 = sessionResult2.GetSession(); + + { + auto sessionResult3 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult3.GetStatus(), EStatus::OVERLOADED); + ASSERT_EQ(client.GetActiveSessionCount(), 3); + } + ASSERT_EQ(client.GetActiveSessionCount(), 2); + + auto status = session1.Close().ExtractValueSync(); + ASSERT_EQ(status.IsTransportError(), false); + ASSERT_EQ(status.GetStatus(), EStatus::SUCCESS); + + // Close doesnt free session from user perspective, + // the value of ActiveSessionsCounter will be same after Close() call. + // Probably we want to chenge this contract + ASSERT_EQ(client.GetActiveSessionCount(), 2); + + auto result = session2.ExecuteDataQuery(R"___( + SELECT 1; + )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS); + + sessionResult1 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult1.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(sessionResult1.GetSession().GetId().empty(), false); + ASSERT_EQ(client.GetActiveSessionCount(), 3); + + auto sessionResult3 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult3.GetStatus(), EStatus::OVERLOADED); + ASSERT_EQ(client.GetActiveSessionCount(), 4); + + auto tmp = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 5); + sessionResult1 = tmp; // here we reset previous created session object, + // so perform close rpc call implicitly and delete it + ASSERT_EQ(sessionResult1.GetStatus(), EStatus::OVERLOADED); + ASSERT_EQ(client.GetActiveSessionCount(), 4); +} + +TEST(YdbSdkSessions, CloseSessionAfterDriverDtorWithoutSessionPool) { + std::vector<std::string> sessionIds; + int iterations = 50; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + auto sessionResult = client.CreateSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + } + + std::shared_ptr<grpc::Channel> channel; + channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + ASSERT_TRUE(status.ok()); + auto deferred = response.operation(); + ASSERT_TRUE(deferred.ready() == true); + ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION); + } +} + +TEST(YdbSdkSessions, CloseSessionWithSessionPoolExplicit) { + std::vector<std::string> sessionIds; + int iterations = 100; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + //TODO: remove this scope after session tracker implementation + { + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + + sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 2); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + // Here previous created session will be returnet to session pool + session1 = sessionResult.GetSession(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + sessionIds.push_back(session1.GetId()); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 9); + if (dis(gen) == 5) { + client.Stop().Apply([client](NThreading::TFuture<void> future) { + EXPECT_EQ(client.GetActiveSessionCount(), 0); + return future; + }).Wait(); + } else { + client.Stop().Wait(); + } + + if (iterations & 4) { + driver.Stop(true); + } + } + + std::shared_ptr<grpc::Channel> channel; + channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + ASSERT_TRUE(status.ok()); + auto deferred = response.operation(); + ASSERT_TRUE(deferred.ready() == true); + ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION); + } +} + +TEST(YdbSdkSessions, CloseSessionWithSessionPoolExplicitDriverStopOnly) { + std::vector<std::string> sessionIds; + int iterations = 100; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + //TODO: remove this scope after session tracker implementation + { + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + + sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 2); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + // Here previous created session will be returnet to session pool + session1 = sessionResult.GetSession(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + sessionIds.push_back(session1.GetId()); + } + driver.Stop(true); + } + + std::shared_ptr<grpc::ChannelInterface> channel; + channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + ASSERT_TRUE(status.ok()); + auto deferred = response.operation(); + ASSERT_TRUE(deferred.ready() == true); + ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION); + } +} + +TEST(YdbSdkSessions, CloseSessionWithSessionPoolFromDtors) { + std::vector<std::string> sessionIds; + int iterations = 100; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + //TODO: remove this scope after session tracker implementation + { + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + + sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 2); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + // Here previous created session will be returnet to session pool + session1 = sessionResult.GetSession(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + sessionIds.push_back(session1.GetId()); + } + } + + std::shared_ptr<grpc::Channel> channel; + channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + ASSERT_TRUE(status.ok()); + auto deferred = response.operation(); + ASSERT_TRUE(deferred.ready() == true); + ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION); + } +} diff --git a/ydb/services/ydb/sdk_sessions_ut/ya.make b/ydb/public/sdk/cpp/tests/integration/sessions/ya.make index c62a0941b93..f263b4eb2b8 100644 --- a/ydb/services/ydb/sdk_sessions_ut/ya.make +++ b/ydb/public/sdk/cpp/tests/integration/sessions/ya.make @@ -1,4 +1,5 @@ -UNITTEST_FOR(ydb/services/ydb) +GTEST() +INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc) FORK_SUBTESTS() @@ -11,15 +12,13 @@ ELSE() ENDIF() SRCS( - sdk_sessions_ut.cpp + main.cpp ) PEERDIR( ydb/public/sdk/cpp/src/library/grpc/client - ydb/core/testlib/default - ydb/core/testlib ydb/public/sdk/cpp/src/client/table - ydb/public/lib/ut_helpers + ydb/public/sdk/cpp/src/client/query ) YQL_LAST_ABI_VERSION() diff --git a/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp b/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp new file mode 100644 index 00000000000..c12937a25ad --- /dev/null +++ b/ydb/public/sdk/cpp/tests/integration/sessions_pool/main.cpp @@ -0,0 +1,367 @@ +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h> + +#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <random> +#include <thread> + +using namespace NYdb; +using namespace NYdb::NTable; + +class YdbSdkSessionsPool : public ::testing::TestWithParam<ui32> { +protected: + void SetUp() override { + ui32 maxActiveSessions = GetParam(); + Driver = std::make_unique<NYdb::TDriver>(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + + auto clientSettings = TClientSettings().SessionPoolSettings( + TSessionPoolSettings() + .MaxActiveSessions(maxActiveSessions) + .KeepAliveIdleThreshold(TDuration::MilliSeconds(10)) + .CloseIdleThreshold(TDuration::MilliSeconds(10))); + Client = std::make_unique<NYdb::NTable::TTableClient>(*Driver, clientSettings); + } + + void TearDown() override { + Driver->Stop(true); + } + +protected: + std::unique_ptr<NYdb::TDriver> Driver; + std::unique_ptr<NYdb::NTable::TTableClient> Client; +}; + +class YdbSdkSessionsPool1Session : public YdbSdkSessionsPool {}; + +enum class EAction: ui8 { + CreateFuture, + ExtractValue, + Return +}; +using TPlan = std::vector<std::pair<EAction, ui32>>; + + +void CheckPlan(TPlan plan) { + std::unordered_map<ui32, EAction> sessions; + for (const auto& [action, sessionId]: plan) { + if (action == EAction::CreateFuture) { + ASSERT_FALSE(sessions.contains(sessionId)); + } else { + ASSERT_TRUE(sessions.contains(sessionId)); + switch (sessions.at(sessionId)) { + case EAction::CreateFuture: { + ASSERT_EQ(action, EAction::ExtractValue); + break; + } + case EAction::ExtractValue: { + ASSERT_EQ(action, EAction::Return); + break; + } + default: { + ASSERT_TRUE(false); + } + } + } + sessions[sessionId] = action; + } +} + +void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) { + std::unordered_map<ui32, NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; + std::unordered_map<ui32, NYdb::NTable::TCreateSessionResult> sessions; + + ui32 requestedSessions = 0; + + for (const auto& [action, sessionId]: plan) { + switch (action) { + case EAction::CreateFuture: { + sessionFutures.emplace(sessionId, client.GetSession()); + ++requestedSessions; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (requestedSessions > client.GetActiveSessionsLimit()) { + ASSERT_EQ(client.GetActiveSessionCount(), client.GetActiveSessionsLimit()); + } + ASSERT_FALSE(sessionFutures.at(sessionId).HasValue()); + break; + } + case EAction::ExtractValue: { + auto it = sessionFutures.find(sessionId); + auto session = it->second.ExtractValueSync(); + sessionFutures.erase(it); + sessions.emplace(sessionId, std::move(session)); + break; + } + case EAction::Return: { + sessions.erase(sessionId); + --requestedSessions; + break; + } + } + ASSERT_LE(client.GetActiveSessionCount(), client.GetActiveSessionsLimit()); + ASSERT_GE(client.GetActiveSessionCount(), static_cast<i64>(sessions.size())); + ASSERT_LE(client.GetActiveSessionCount(), static_cast<i64>(sessions.size() + sessionFutures.size())); + } +} + +int GetRand(std::mt19937& rng, int min, int max) { + std::uniform_int_distribution<std::mt19937::result_type> dist(min, max); + return dist(rng); +} + + +TPlan GenerateRandomPlan(ui32 numSessions) { + TPlan plan; + std::random_device dev; + std::mt19937 rng(dev()); + + for (ui32 i = 0; i < numSessions; ++i) { + std::uniform_int_distribution<std::mt19937::result_type> dist(0, plan.size()); + ui32 prevPos = 0; + for (EAction action: {EAction::CreateFuture, EAction::ExtractValue, EAction::Return}) { + int pos = GetRand(rng, prevPos, plan.size()); + plan.emplace(plan.begin() + pos, std::make_pair(action, i)); + prevPos = pos + 1; + } + } + return plan; +} + + +TEST_P(YdbSdkSessionsPool1Session, GetSession) { + ASSERT_EQ(Client->GetActiveSessionsLimit(), 1); + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), 0); + + { + auto session = Client->GetSession().ExtractValueSync(); + + ASSERT_EQ(session.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(Client->GetActiveSessionCount(), 1); + ASSERT_EQ(Client->GetCurrentPoolSize(), 0); + } + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), 1); +} + +void TestWaitQueue(NYdb::NTable::TTableClient& client, ui32 activeSessionsLimit) { + std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; + std::vector<NYdb::NTable::TCreateSessionResult> sessions; + + // exhaust the pool + for (ui32 i = 0; i < activeSessionsLimit; ++i) { + sessions.emplace_back(client.GetSession().ExtractValueSync()); + } + ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit); + + // next should be in the wait queue + for (ui32 i = 0; i < activeSessionsLimit * 10; ++i) { + sessionFutures.emplace_back(client.GetSession()); + } + ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit); + + // next should be a fake session + { + auto brokenSession = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(brokenSession.IsSuccess()); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + for (auto& sessionFuture: sessionFutures) { + ASSERT_FALSE(sessionFuture.HasValue()); + } + + for (auto& sessionFuture: sessionFutures) { + sessions.erase(sessions.begin()); + sessions.emplace_back(sessionFuture.ExtractValueSync()); + } + ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit); +} + +TEST_P(YdbSdkSessionsPool, WaitQueue) { + TestWaitQueue(*Client, GetParam()); +} + +TEST_P(YdbSdkSessionsPool1Session, RunSmallPlan) { + TPlan plan{ + {EAction::CreateFuture, 1}, + {EAction::ExtractValue, 1}, + {EAction::CreateFuture, 2}, + {EAction::Return, 1}, + {EAction::ExtractValue, 2}, + {EAction::Return, 2} + }; + CheckPlan(plan); + RunPlan(plan, *Client); + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), 1); +} + +TEST_P(YdbSdkSessionsPool1Session, CustomPlan) { + TPlan plan{ + {EAction::CreateFuture, 1} + }; + CheckPlan(plan); + RunPlan(plan, *Client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); +} + +ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) { + std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; + std::vector<NYdb::NTable::TCreateSessionResult> sessions; + std::mt19937 rng(0); + ui32 successCount = 0; + + for (ui32 i = 0; i < activeSessionsLimit * 12; ++i) { + sessionFutures.emplace_back(client.GetSession()); + } + + for (ui32 i = 0; i < n; ++i) { + switch (static_cast<EAction>(GetRand(rng, 0, 2))) { + case EAction::CreateFuture: { + sessionFutures.emplace_back(client.GetSession()); + break; + } + case EAction::ExtractValue: { + if (sessionFutures.empty()) { + break; + } + auto ind = GetRand(rng, 0, sessionFutures.size() - 1); + auto sessionFuture = sessionFutures[ind]; + if (sessionFuture.HasValue()) { + auto session = sessionFuture.ExtractValueSync(); + if (session.IsSuccess()) { + ++successCount; + } + sessions.emplace_back(std::move(session)); + sessionFutures.erase(sessionFutures.begin() + ind); + break; + } + break; + } + case EAction::Return: { + if (sessions.empty()) { + break; + } + auto ind = GetRand(rng, 0, sessions.size() - 1); + sessions.erase(sessions.begin() + ind); + break; + } + } + } + return successCount; +} + +TEST_P(YdbSdkSessionsPool, StressTestSync) { + ui32 activeSessionsLimit = GetParam(); + + RunStressTestSync(1000, activeSessionsLimit, *Client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit); +} + +ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& client) { + std::atomic<ui32> successCount(0); + std::atomic<ui32> jobIndex(0); + + auto job = [&client, &successCount, &jobIndex, n]() mutable { + std::mt19937 rng(++jobIndex); + for (ui32 i = 0; i < n; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100))); + auto sessionFuture = client.GetSession(); + std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100))); + auto session = sessionFuture.ExtractValueSync(); + std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100))); + successCount += session.IsSuccess(); + } + }; + + std::vector<std::thread> threads; + for (ui32 i = 0; i < nThreads; i++) { + threads.emplace_back(job); + } + for (auto& thread: threads) { + thread.join(); + } + + return successCount; +} + +TEST_P(YdbSdkSessionsPool, StressTestAsync) { + ui32 activeSessionsLimit = GetParam(); + ui32 iterations = (activeSessionsLimit == 1) ? 100 : 1000; + + RunStressTestAsync(iterations, 10, *Client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit); +} + +void TestPeriodicTask(ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) { + std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; + std::vector<NYdb::NTable::TCreateSessionResult> sessions; + + for (ui32 i = 0; i < activeSessionsLimit; ++i) { + sessions.emplace_back(client.GetSession().ExtractValueSync()); + ASSERT_TRUE(sessions.back().IsSuccess()); + } + + for (ui32 i = 0; i < activeSessionsLimit; ++i) { + sessionFutures.emplace_back(client.GetSession()); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + for (auto& sessionFuture : sessionFutures) { + ASSERT_FALSE(sessionFuture.HasValue()); + } + + // Wait for wait session timeout + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + for (auto& sessionFuture : sessionFutures) { + ASSERT_TRUE(sessionFuture.HasValue()); + ASSERT_FALSE(sessionFuture.ExtractValueSync().IsSuccess()); + } + + ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit); + + sessionFutures.clear(); + sessions.clear(); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(client.GetCurrentPoolSize(), activeSessionsLimit); +} + +TEST_P(YdbSdkSessionsPool, PeriodicTask) { + TestPeriodicTask(GetParam(), *Client); +} + +TEST_P(YdbSdkSessionsPool1Session, FailTest) { + // This test reproduces bug from KIKIMR-18063 + auto sessionFromPool = Client->GetSession().ExtractValueSync(); + auto futureInWaitPool = Client->GetSession(); + + { + auto standaloneSessionThatWillBeBroken = Client->CreateSession().ExtractValueSync(); + auto res = standaloneSessionThatWillBeBroken.GetSession().ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync(); + } +} + +INSTANTIATE_TEST_SUITE_P(, YdbSdkSessionsPool, ::testing::Values(1, 10)); + +INSTANTIATE_TEST_SUITE_P(, YdbSdkSessionsPool1Session, ::testing::Values(1)); diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/ya.make b/ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make index ae03a20c02e..998a6caec44 100644 --- a/ydb/services/ydb/sdk_sessions_pool_ut/ya.make +++ b/ydb/public/sdk/cpp/tests/integration/sessions_pool/ya.make @@ -1,4 +1,5 @@ -UNITTEST_FOR(ydb/services/ydb) +GTEST() +INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc) FORK_SUBTESTS() @@ -11,13 +12,11 @@ ELSE() ENDIF() SRCS( - sdk_sessions_pool_ut.cpp + main.cpp ) PEERDIR( ydb/public/sdk/cpp/src/library/grpc/client - ydb/core/testlib/default - ydb/core/testlib ydb/public/sdk/cpp/src/client/table ) diff --git a/ydb/public/sdk/cpp/tests/integration/ya.make b/ydb/public/sdk/cpp/tests/integration/ya.make index e801ac6b63a..d55b85646fe 100644 --- a/ydb/public/sdk/cpp/tests/integration/ya.make +++ b/ydb/public/sdk/cpp/tests/integration/ya.make @@ -2,4 +2,6 @@ RECURSE( basic_example bulk_upsert server_restart + sessions + sessions_pool ) diff --git a/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp b/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp deleted file mode 100644 index 0a60c556b63..00000000000 --- a/ydb/services/ydb/sdk_sessions_pool_ut/sdk_sessions_pool_ut.cpp +++ /dev/null @@ -1,448 +0,0 @@ -#include "ydb_common_ut.h" - -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h> -#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> - -#include <util/thread/pool.h> - -#include <random> -#include <thread> - -using namespace NYdb; -using namespace NYdb::NTable; - -class TDefaultTestSetup { -public: - TDefaultTestSetup(ui32 maxActiveSessions) - : Driver_(NYdb::TDriver( - TDriverConfig().SetEndpoint( - TStringBuilder() << "localhost:" << Server_.GetPort() - ) - )) - , Client_( - Driver_, - TClientSettings().SessionPoolSettings( - TSessionPoolSettings() - .MaxActiveSessions(maxActiveSessions) - .KeepAliveIdleThreshold(TDuration::MilliSeconds(10)) - .CloseIdleThreshold(TDuration::MilliSeconds(10)) - ) - ) - { - } - - ~TDefaultTestSetup() { - Driver_.Stop(true); - } - - NYdb::NTable::TTableClient& GetClient() { - return Client_; - } - -private: - TKikimrWithGrpcAndRootSchema Server_; - NYdb::TDriver Driver_; - NYdb::NTable::TTableClient Client_; -}; - - -enum class EAction: ui8 { - CreateFuture, - ExtractValue, - Return -}; -using TPlan = TVector<std::pair<EAction, ui32>>; - - -void CheckPlan(TPlan plan) { - THashMap<ui32, EAction> sessions; - for (const auto& [action, sessionId]: plan) { - if (action == EAction::CreateFuture) { - UNIT_ASSERT(!sessions.contains(sessionId)); - } else { - UNIT_ASSERT(sessions.contains(sessionId)); - switch (sessions.at(sessionId)) { - case EAction::CreateFuture: { - UNIT_ASSERT(action == EAction::ExtractValue); - break; - } - case EAction::ExtractValue: { - UNIT_ASSERT(action == EAction::Return); - break; - } - default: { - UNIT_ASSERT(false); - } - } - } - sessions[sessionId] = action; - } -} - -void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) { - THashMap<ui32, NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; - THashMap<ui32, NYdb::NTable::TCreateSessionResult> sessions; - - ui32 requestedSessions = 0; - - for (const auto& [action, sessionId]: plan) { - switch (action) { - case EAction::CreateFuture: { - sessionFutures.emplace(sessionId, client.GetSession()); - ++requestedSessions; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - if (requestedSessions > client.GetActiveSessionsLimit()) { - UNIT_ASSERT(client.GetActiveSessionCount() == client.GetActiveSessionsLimit()); - } - UNIT_ASSERT(!sessionFutures.at(sessionId).HasValue()); - break; - } - case EAction::ExtractValue: { - auto it = sessionFutures.find(sessionId); - auto session = it->second.ExtractValueSync(); - sessionFutures.erase(it); - sessions.emplace(sessionId, std::move(session)); - break; - } - case EAction::Return: { - sessions.erase(sessionId); - --requestedSessions; - break; - } - } - UNIT_ASSERT(client.GetActiveSessionCount() <= client.GetActiveSessionsLimit()); - UNIT_ASSERT(client.GetActiveSessionCount() >= static_cast<i64>(sessions.size())); - UNIT_ASSERT(client.GetActiveSessionCount() <= static_cast<i64>(sessions.size() + sessionFutures.size())); - } -} - -int GetRand(std::mt19937& rng, int min, int max) { - std::uniform_int_distribution<std::mt19937::result_type> dist(min, max); - return dist(rng); -} - - -TPlan GenerateRandomPlan(ui32 numSessions) { - TPlan plan; - std::random_device dev; - std::mt19937 rng(dev()); - - for (ui32 i = 0; i < numSessions; ++i) { - std::uniform_int_distribution<std::mt19937::result_type> dist(0, plan.size()); - ui32 prevPos = 0; - for (EAction action: {EAction::CreateFuture, EAction::ExtractValue, EAction::Return}) { - int pos = GetRand(rng, prevPos, plan.size()); - plan.emplace(plan.begin() + pos, std::make_pair(action, i)); - prevPos = pos + 1; - } - } - return plan; -} - - -Y_UNIT_TEST_SUITE(YdbSdkSessionsPool) { - Y_UNIT_TEST(Get1Session) { - TDefaultTestSetup setup(1); - auto& client = setup.GetClient(); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionsLimit(), 1); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 0); - - { - auto session = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 0); - } - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 1); - } - - void TestWaitQueue(NYdb::NTable::TTableClient& client, ui32 activeSessionsLimit) { - std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; - std::vector<NYdb::NTable::TCreateSessionResult> sessions; - - // exhaust the pool - for (ui32 i = 0; i < activeSessionsLimit; ++i) { - sessions.emplace_back(client.GetSession().ExtractValueSync()); - } - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit); - - // next should be in the wait queue - for (ui32 i = 0; i < activeSessionsLimit * 10; ++i) { - sessionFutures.emplace_back(client.GetSession()); - } - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit); - - // next should be a fake session - { - auto brokenSession = client.GetSession().ExtractValueSync(); - UNIT_ASSERT(!brokenSession.IsSuccess()); - } - - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - for (auto& sessionFuture: sessionFutures) { - UNIT_ASSERT(!sessionFuture.HasValue()); - } - - for (auto& sessionFuture: sessionFutures) { - sessions.erase(sessions.begin()); - sessions.emplace_back(sessionFuture.ExtractValueSync()); - } - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit); - } - - Y_UNIT_TEST(WaitQueue1) { - ui32 activeSessionsLimit = 1; - - TDefaultTestSetup setup(activeSessionsLimit); - auto& client = setup.GetClient(); - - TestWaitQueue(client, activeSessionsLimit); - } - - Y_UNIT_TEST(WaitQueue10) { - ui32 activeSessionsLimit = 10; - - TDefaultTestSetup setup(activeSessionsLimit); - auto& client = setup.GetClient(); - - TestWaitQueue(client, activeSessionsLimit); - } - - Y_UNIT_TEST(RunSmallPlan) { - TDefaultTestSetup setup(1); - auto& client = setup.GetClient(); - - TPlan plan{ - {EAction::CreateFuture, 1}, - {EAction::ExtractValue, 1}, - {EAction::CreateFuture, 2}, - {EAction::Return, 1}, - {EAction::ExtractValue, 2}, - {EAction::Return, 2} - }; - CheckPlan(plan); - RunPlan(plan, client); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), 1); - } - - Y_UNIT_TEST(CustomPlan) { - TDefaultTestSetup setup(1); - auto& client = setup.GetClient(); - - TPlan plan{ - {EAction::CreateFuture, 1} - }; - CheckPlan(plan); - RunPlan(plan, client); - - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - } - - ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) { - std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; - std::vector<NYdb::NTable::TCreateSessionResult> sessions; - std::mt19937 rng(0); - ui32 successCount = 0; - - for (ui32 i = 0; i < activeSessionsLimit * 12; ++i) { - sessionFutures.emplace_back(client.GetSession()); - } - - for (ui32 i = 0; i < n; ++i) { - switch (static_cast<EAction>(GetRand(rng, 0, 2))) { - case EAction::CreateFuture: { - sessionFutures.emplace_back(client.GetSession()); - break; - } - case EAction::ExtractValue: { - if (sessionFutures.empty()) { - break; - } - auto ind = GetRand(rng, 0, sessionFutures.size() - 1); - auto sessionFuture = sessionFutures[ind]; - if (sessionFuture.HasValue()) { - auto session = sessionFuture.ExtractValueSync(); - if (session.IsSuccess()) { - ++successCount; - } - sessions.emplace_back(std::move(session)); - sessionFutures.erase(sessionFutures.begin() + ind); - break; - } - break; - } - case EAction::Return: { - if (sessions.empty()) { - break; - } - auto ind = GetRand(rng, 0, sessions.size() - 1); - sessions.erase(sessions.begin() + ind); - break; - } - } - } - return successCount; - } - - Y_UNIT_TEST(StressTestSync1) { - ui32 activeSessionsLimit = 1; - - TDefaultTestSetup setup(activeSessionsLimit); - auto& client = setup.GetClient(); - - RunStressTestSync(1000, activeSessionsLimit, client); - - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); - } - - Y_UNIT_TEST(StressTestSync10) { - ui32 activeSessionsLimit = 10; - - TDefaultTestSetup setup(activeSessionsLimit); - auto& client = setup.GetClient(); - - RunStressTestSync(1000, activeSessionsLimit, client); - - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); - } - - ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& client) { - std::atomic<ui32> successCount(0); - std::atomic<ui32> jobIndex(0); - - auto job = [&client, &successCount, &jobIndex, n]() mutable { - std::mt19937 rng(++jobIndex); - for (ui32 i = 0; i < n; ++i) { - std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100))); - auto sessionFuture = client.GetSession(); - std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100))); - auto session = sessionFuture.ExtractValueSync(); - std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100))); - successCount += session.IsSuccess(); - } - }; - - IThreadFactory* pool = SystemThreadFactory(); - TVector<TAutoPtr<IThreadFactory::IThread>> threads; - threads.resize(nThreads); - for (ui32 i = 0; i < nThreads; i++) { - threads[i] = pool->Run(job); - } - for (ui32 i = 0; i < nThreads; i++) { - threads[i]->Join(); - } - - return successCount; - } - - Y_UNIT_TEST(StressTestAsync1) { - ui32 activeSessionsLimit = 1; - - TDefaultTestSetup setup(activeSessionsLimit); - auto& client = setup.GetClient(); - - RunStressTestAsync(100, 10, client); - - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); - } - - Y_UNIT_TEST(StressTestAsync10) { - ui32 activeSessionsLimit = 10; - - TDefaultTestSetup setup(activeSessionsLimit); - auto& client = setup.GetClient(); - - RunStressTestAsync(1000, 10, client); - - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); - } - - void TestPeriodicTask(ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) { - std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures; - std::vector<NYdb::NTable::TCreateSessionResult> sessions; - - for (ui32 i = 0; i < activeSessionsLimit; ++i) { - sessions.emplace_back(client.GetSession().ExtractValueSync()); - UNIT_ASSERT_VALUES_EQUAL(sessions.back().IsSuccess(), true); - } - - for (ui32 i = 0; i < activeSessionsLimit; ++i) { - sessionFutures.emplace_back(client.GetSession()); - } - - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - for (auto& sessionFuture : sessionFutures) { - UNIT_ASSERT(!sessionFuture.HasValue()); - } - - // Wait for wait session timeout - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - - for (auto& sessionFuture : sessionFutures) { - UNIT_ASSERT(sessionFuture.HasValue()); - UNIT_ASSERT(!sessionFuture.ExtractValueSync().IsSuccess()); - } - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), activeSessionsLimit); - - sessionFutures.clear(); - sessions.clear(); - - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), activeSessionsLimit); - } - - Y_UNIT_TEST(PeriodicTask1) { - ui32 activeSessionsLimit = 1; - - TDefaultTestSetup setup(activeSessionsLimit); - auto& client = setup.GetClient(); - - TestPeriodicTask(activeSessionsLimit, client); - } - - Y_UNIT_TEST(PeriodicTask10) { - ui32 activeSessionsLimit = 10; - - TDefaultTestSetup setup(activeSessionsLimit); - auto& client = setup.GetClient(); - - TestPeriodicTask(activeSessionsLimit, client); - } - - Y_UNIT_TEST(FailTest) { - // This test reproduces bug from KIKIMR-18063 - TDefaultTestSetup setup(1); - auto& client = setup.GetClient(); - - auto sessionFromPool = client.GetSession().ExtractValueSync(); - auto futureInWaitPool = client.GetSession(); - - { - auto standaloneSessionThatWillBeBroken = client.CreateSession().ExtractValueSync(); - auto res = standaloneSessionThatWillBeBroken.GetSession().ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync(); - } - } -} diff --git a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp deleted file mode 100644 index 85312403144..00000000000 --- a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp +++ /dev/null @@ -1,831 +0,0 @@ -#include "ydb_common_ut.h" -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h> -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h> - -#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> - -#include <ydb/public/lib/ut_helpers/ut_helpers_query.h> - -using namespace NYdb; -using namespace NYdb::NTable; - -namespace { - -void CreateTestTable(NYdb::TDriver& driver) { - NYdb::NTable::TTableClient client(driver); - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT(sessionResponse.IsSuccess()); - auto session = sessionResponse.GetSession(); - auto result = session.ExecuteSchemeQuery(R"___( - CREATE TABLE `Root/Test` ( - Key Uint32, - Value String, - PRIMARY KEY (Key) - ); - )___").ExtractValueSync(); - UNIT_ASSERT(result.IsSuccess()); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); -} - -TString WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client) { - TString sessionId; - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT(sessionResponse.IsSuccess()); - auto session = sessionResponse.GetSession(); - sessionId = session.GetId(); - auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); - - TResultSetParser resultSet(res.GetResultSetParser(0)); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - - return sessionId; -} - -void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, i64 expected) { - int attempt = 10; - while (attempt--) { - if (client.GetCurrentPoolSize() == expected) - break; - Sleep(TDuration::MilliSeconds(100)); - } - UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), expected); -} - -} - -Y_UNIT_TEST_SUITE(YdbSdkSessions) { - Y_UNIT_TEST(TestSessionPool) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - const TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - int count = 10; - - THashSet<std::string> sids; - while (count--) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); - auto session = sessionResponse.GetSession(); - sids.insert(session.GetId()); - auto result = session.ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(result.GetEndpoint(), location); - } - // All requests used one session - UNIT_ASSERT_VALUES_EQUAL(sids.size(), 1); - // No more session captured by client - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - - driver.Stop(true); - } - - Y_UNIT_TEST(TestMultipleSessions) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - int count = 10; - - TVector<TSession> sids; - TVector<TAsyncDataQueryResult> results; - while (count--) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); - auto session = sessionResponse.GetSession(); - sids.push_back(session); - results.push_back(session.ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); - } - - NThreading::WaitExceptionOrAll(results).Wait(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10); - - for (auto& result : results) { - UNIT_ASSERT_EQUAL(result.GetValue().GetStatus(), EStatus::SUCCESS); - } - sids.clear(); - results.clear(); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - - driver.Stop(true); - } - - Y_UNIT_TEST(TestActiveSessionCountAfterBadSession) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - int count = 10; - - TVector<TSession> sids; - TVector<TAsyncDataQueryResult> results; - while (count--) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); - auto session = sessionResponse.GetSession(); - sids.push_back(session); - if (count == 0) { - // Force BAD session server response for ExecuteDataQuery - UNIT_ASSERT_EQUAL(session.Close().GetValueSync().GetStatus(), EStatus::SUCCESS); - results.push_back(session.ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); - } else { - results.push_back(session.ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); - } - } - - NThreading::WaitExceptionOrAll(results).Wait(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10); - - for (size_t i = 0; i < results.size(); i++) { - if (i == 9) { - UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::BAD_SESSION); - } else { - UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::SUCCESS); - } - } - sids.clear(); - results.clear(); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - - driver.Stop(true); - } - - Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryService) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - auto clientConfig = NGRpcProxy::TGRpcClientConfig(location); - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - CreateTestTable(driver); - - NYdb::NQuery::TQueryClient client(driver); - TString sessionId = WarmPoolCreateSession(client); - WaitForSessionsInPool(client, 1); - - bool allDoneOk = true; - NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk); - UNIT_ASSERT(allDoneOk); - - { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT(sessionResponse.IsSuccess()); - auto session = sessionResponse.GetSession(); - UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId); - - auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString()); - } - - WaitForSessionsInPool(client, 0); - - { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT(sessionResponse.IsSuccess()); - auto session = sessionResponse.GetSession(); - UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId); - auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); - } - - WaitForSessionsInPool(client, 1); - - driver.Stop(true); - } - - Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - auto clientConfig = NGRpcProxy::TGRpcClientConfig(location); - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - CreateTestTable(driver); - - NYdb::NQuery::TQueryClient client(driver); - TString sessionId = WarmPoolCreateSession(client); - WaitForSessionsInPool(client, 1); - - bool allDoneOk = true; - NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk); - UNIT_ASSERT(allDoneOk); - - { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT(sessionResponse.IsSuccess()); - auto session = sessionResponse.GetSession(); - UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId); - - auto it = session.StreamExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); - - auto res = it.ReadNext().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString()); - } - - WaitForSessionsInPool(client, 0); - - { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT(sessionResponse.IsSuccess()); - auto session = sessionResponse.GetSession(); - UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId); - - auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); - } - - WaitForSessionsInPool(client, 1); - - driver.Stop(true); - } - - Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - int count = 100; - - { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT(sessionResponse.IsSuccess()); - auto session = sessionResponse.GetSession(); - auto result = session.ExecuteSchemeQuery(R"___( - CREATE TABLE `Root/Test` ( - Key Uint32, - Value String, - PRIMARY KEY (Key) - ); - )___").ExtractValueSync(); - UNIT_ASSERT(result.IsSuccess()); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - } - - while (count--) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); - auto session = sessionResponse.GetSession(); - - // Assume 10us is too small to execute query and get response - auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - } - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - driver.Stop(true); - } - - Y_UNIT_TEST(MultiThreadSync) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - const int nThreads = 10; - const int nRequests = 1000; - auto job = [client]() mutable { - for (int i = 0; i < nRequests; i++) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.GetStatus(), EStatus::SUCCESS); - } - }; - IThreadFactory* pool = SystemThreadFactory(); - - TVector<TAutoPtr<IThreadFactory::IThread>> threads; - threads.resize(nThreads); - for (int i = 0; i < nThreads; i++) { - threads[i] = pool->Run(job); - } - for (int i = 0; i < nThreads; i++) { - threads[i]->Join(); - } - UNIT_ASSERT_EQUAL(client.GetActiveSessionCount(), 0); - driver.Stop(true); - } - - void EnsureCanExecQuery(NYdb::NTable::TSession session) { - auto execStatus = session.ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus(); - UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS); - } - - void EnsureCanExecQuery(NYdb::NQuery::TSession session) { - auto execStatus = session.ExecuteQuery("SELECT 42;", - NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync().GetStatus(); - UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS); - } - - template<typename TClient> - void DoMultiThreadSessionPoolLimitSync() { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - const int maxActiveSessions = 45; - TClient client(driver, - typename TClient::TSettings() - .SessionPoolSettings( - typename TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); - - constexpr int nThreads = 100; - NYdb::EStatus statuses[nThreads]; - TVector<TMaybe<typename TClient::TSession>> sessions; - sessions.resize(nThreads); - TAtomic t = 0; - auto job = [client, &t, &statuses, &sessions]() mutable { - auto sessionResponse = client.GetSession().ExtractValueSync(); - int i = AtomicIncrement(t); - statuses[--i] = sessionResponse.GetStatus(); - if (statuses[i] == EStatus::SUCCESS) { - EnsureCanExecQuery(sessionResponse.GetSession()); - sessions[i] = sessionResponse.GetSession(); - } - }; - IThreadFactory* pool = SystemThreadFactory(); - - TVector<TAutoPtr<IThreadFactory::IThread>> threads; - threads.resize(nThreads); - for (int i = 0; i < nThreads; i++) { - threads[i] = pool->Run(job); - } - for (int i = 0; i < nThreads; i++) { - threads[i]->Join(); - } - - sessions.clear(); - - int successCount = 0; - int exhaustedCount = 0; - for (int i = 0; i < nThreads; i++) { - switch (statuses[i]) { - case EStatus::SUCCESS: - successCount++; - break; - case EStatus::CLIENT_RESOURCE_EXHAUSTED: - exhaustedCount++; - break; - default: - UNIT_ASSERT(false); - } - } - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(successCount, maxActiveSessions); - UNIT_ASSERT_VALUES_EQUAL(exhaustedCount, nThreads - maxActiveSessions); - driver.Stop(true); - } - - Y_UNIT_TEST(MultiThreadSessionPoolLimitSyncTableClient) { - DoMultiThreadSessionPoolLimitSync<NYdb::NTable::TTableClient>(); - } - - Y_UNIT_TEST(MultiThreadSessionPoolLimitSyncQueryClient) { - DoMultiThreadSessionPoolLimitSync<NYdb::NQuery::TQueryClient>(); - } - - NYdb::NTable::TAsyncDataQueryResult ExecQueryAsync(NYdb::NTable::TSession session, const TString q) { - return session.ExecuteDataQuery(q, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()); - } - - NYdb::NQuery::TAsyncExecuteQueryResult ExecQueryAsync(NYdb::NQuery::TSession session, const TString q) { - return session.ExecuteQuery(q, - NYdb::NQuery::TTxControl::BeginTx().CommitTx()); - } - - template<typename T> - void DoMultiThreadMultipleRequestsOnSharedSessions() { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - const int maxActiveSessions = 10; - typename T::TClient client(driver, - typename T::TClient::TSettings() - .SessionPoolSettings( - typename T::TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); - - constexpr int nThreads = 20; - constexpr int nRequests = 50; - std::array<TVector<typename T::TResult>, nThreads> results; - TAtomic t = 0; - TAtomic validSessions = 0; - auto job = [client, &t, &results, &validSessions]() mutable { - auto sessionResponse = client.GetSession().ExtractValueSync(); - - int i = AtomicIncrement(t); - TVector<typename T::TResult>& r = results[--i]; - - if (sessionResponse.GetStatus() != EStatus::SUCCESS) { - return; - } - AtomicIncrement(validSessions); - - for (int i = 0; i < nRequests; i++) { - r.push_back(ExecQueryAsync(sessionResponse.GetSession(), "SELECT 42;")); - } - }; - IThreadFactory* pool = SystemThreadFactory(); - - TVector<TAutoPtr<IThreadFactory::IThread>> threads; - threads.resize(nThreads); - for (int i = 0; i < nThreads; i++) { - threads[i] = pool->Run(job); - } - for (int i = 0; i < nThreads; i++) { - threads[i]->Join(); - } - - for (auto& r : results) { - NThreading::WaitExceptionOrAll(r).Wait(); - } - for (auto& r : results) { - if (!r.empty()) { - for (auto& asyncStatus : r) { - auto res = asyncStatus.GetValue(); - if (!res.IsSuccess()) { - UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::SESSION_BUSY); - } - } - } - } - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), maxActiveSessions); - auto curExpectedActive = maxActiveSessions; - auto empty = 0; - for (auto& r : results) { - if (!r.empty()) { - r.clear(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), --curExpectedActive); - } else { - empty++; - } - } - UNIT_ASSERT_VALUES_EQUAL(empty, nThreads - maxActiveSessions); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - driver.Stop(true); - } - - Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessionsTableClient) { - struct TTypeHelper { - using TClient = NYdb::NTable::TTableClient; - using TResult = NYdb::NTable::TAsyncDataQueryResult; - }; - DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>(); - } - - // Enable after interactive tx support - //Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessionsQueryClient) { - // struct TTypeHelper { - // using TClient = NYdb::NQuery::TQueryClient; - // using TResult = NYdb::NQuery::TAsyncExecuteQueryResult; - // }; - // DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>(); - //} - - Y_UNIT_TEST(SessionsServerLimit) { - NKikimrConfig::TAppConfig appConfig; - auto& tableServiceConfig = *appConfig.MutableTableServiceConfig(); - tableServiceConfig.SetSessionsLimitPerNode(2); - - TKikimrWithGrpcAndRootSchema server(appConfig); - - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - auto sessionResult = client.CreateSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - - sessionResult = client.CreateSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session2 = sessionResult.GetSession(); - - sessionResult = client.CreateSession().ExtractValueSync(); - sessionResult.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED); - - auto status = session1.Close().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS); - - auto result = session2.ExecuteDataQuery(R"___( - SELECT 1; - )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - - sessionResult = client.CreateSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - - sessionResult = client.CreateSession().ExtractValueSync(); - sessionResult.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - } - - Y_UNIT_TEST(SessionsServerLimitWithSessionPool) { - NKikimrConfig::TAppConfig appConfig; - auto& tableServiceConfig = *appConfig.MutableTableServiceConfig(); - tableServiceConfig.SetSessionsLimitPerNode(2); - - TKikimrWithGrpcAndRootSchema server(appConfig); - - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - auto sessionResult1 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - auto session1 = sessionResult1.GetSession(); - - auto sessionResult2 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult2.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - auto session2 = sessionResult2.GetSession(); - - { - auto sessionResult3 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3); - } - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - - auto status = session1.Close().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS); - - // Close doesnt free session from user perspective, - // the value of ActiveSessionsCounter will be same after Close() call. - // Probably we want to chenge this contract - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - - auto result = session2.ExecuteDataQuery(R"___( - SELECT 1; - )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - - sessionResult1 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetSession().GetId().empty(), false); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3); - - auto sessionResult3 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4); - - auto tmp = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 5); - sessionResult1 = tmp; // here we reset previous created session object, - // so perform close rpc call implicitly and delete it - UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::OVERLOADED); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4); - } - - Y_UNIT_TEST(CloseSessionAfterDriverDtorWithoutSessionPool) { - NKikimrConfig::TAppConfig appConfig; - - TKikimrWithGrpcAndRootSchema server(appConfig); - - std::vector<std::string> sessionIds; - int iterations = 50; - - while (iterations--) { - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - auto sessionResult = client.CreateSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - sessionIds.push_back(session1.GetId()); - } - - std::shared_ptr<grpc::Channel> channel; - channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); - auto stub = Ydb::Table::V1::TableService::NewStub(channel); - for (const auto& sessionId : sessionIds) { - grpc::ClientContext context; - Ydb::Table::KeepAliveRequest request; - request.set_session_id(TStringType{sessionId}); - Ydb::Table::KeepAliveResponse response; - auto status = stub->KeepAlive(&context, request, &response); - UNIT_ASSERT(status.ok()); - auto deferred = response.operation(); - UNIT_ASSERT(deferred.ready() == true); - UNIT_ASSERT_VALUES_EQUAL(deferred.status(), Ydb::StatusIds::BAD_SESSION); - } - } - - Y_UNIT_TEST(CloseSessionWithSessionPoolExplicit) { - NKikimrConfig::TAppConfig appConfig; - - TKikimrWithGrpcAndRootSchema server(appConfig); - - std::vector<std::string> sessionIds; - int iterations = 100; - - while (iterations--) { - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - //TODO: remove this scope after session tracker implementation - { - auto sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - sessionIds.push_back(session1.GetId()); - - sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - // Here previous created session will be returnet to session pool - session1 = sessionResult.GetSession(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - sessionIds.push_back(session1.GetId()); - } - - if (RandomNumber<ui32>(10) == 5) { - client.Stop().Apply([client](NThreading::TFuture<void> future){ - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - return future; - }).Wait(); - } else { - client.Stop().Wait(); - } - - if (iterations & 4) { - driver.Stop(true); - } - } - - std::shared_ptr<grpc::Channel> channel; - channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); - auto stub = Ydb::Table::V1::TableService::NewStub(channel); - for (const auto& sessionId : sessionIds) { - grpc::ClientContext context; - Ydb::Table::KeepAliveRequest request; - request.set_session_id(TStringType{sessionId}); - Ydb::Table::KeepAliveResponse response; - auto status = stub->KeepAlive(&context, request, &response); - UNIT_ASSERT(status.ok()); - auto deferred = response.operation(); - UNIT_ASSERT(deferred.ready() == true); - UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION); - } - } - - Y_UNIT_TEST(CloseSessionWithSessionPoolExplicitDriverStopOnly) { - NKikimrConfig::TAppConfig appConfig; - - TKikimrWithGrpcAndRootSchema server(appConfig); - - std::vector<std::string> sessionIds; - int iterations = 100; - - while (iterations--) { - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - //TODO: remove this scope after session tracker implementation - { - auto sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - sessionIds.push_back(session1.GetId()); - - sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - // Here previous created session will be returnet to session pool - session1 = sessionResult.GetSession(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - sessionIds.push_back(session1.GetId()); - } - driver.Stop(true); - } - - std::shared_ptr<grpc::Channel> channel; - channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); - auto stub = Ydb::Table::V1::TableService::NewStub(channel); - for (const auto& sessionId : sessionIds) { - grpc::ClientContext context; - Ydb::Table::KeepAliveRequest request; - request.set_session_id(TStringType{sessionId}); - Ydb::Table::KeepAliveResponse response; - auto status = stub->KeepAlive(&context, request, &response); - UNIT_ASSERT(status.ok()); - auto deferred = response.operation(); - UNIT_ASSERT(deferred.ready() == true); - UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION); - } - } - - Y_UNIT_TEST(CloseSessionWithSessionPoolFromDtors) { - NKikimrConfig::TAppConfig appConfig; - - TKikimrWithGrpcAndRootSchema server(appConfig); - - std::vector<std::string> sessionIds; - int iterations = 100; - - while (iterations--) { - NYdb::TDriver driver(TDriverConfig() - .SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - //TODO: remove this scope after session tracker implementation - { - auto sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - sessionIds.push_back(session1.GetId()); - - sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - // Here previous created session will be returnet to session pool - session1 = sessionResult.GetSession(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - sessionIds.push_back(session1.GetId()); - } - } - - std::shared_ptr<grpc::Channel> channel; - channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); - auto stub = Ydb::Table::V1::TableService::NewStub(channel); - for (const auto& sessionId : sessionIds) { - grpc::ClientContext context; - Ydb::Table::KeepAliveRequest request; - request.set_session_id(TStringType{sessionId}); - Ydb::Table::KeepAliveResponse response; - auto status = stub->KeepAlive(&context, request, &response); - UNIT_ASSERT(status.ok()); - auto deferred = response.operation(); - UNIT_ASSERT(deferred.ready() == true); - UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION); - } - } -} diff --git a/ydb/services/ydb/ya.make b/ydb/services/ydb/ya.make index 0dbb567a899..3c83efe6674 100644 --- a/ydb/services/ydb/ya.make +++ b/ydb/services/ydb/ya.make @@ -39,8 +39,6 @@ END() RECURSE_FOR_TESTS( backup_ut - sdk_sessions_ut - sdk_sessions_pool_ut table_split_ut ut ) |