summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <[email protected]>2025-09-25 22:35:06 +0300
committerGitHub <[email protected]>2025-09-25 22:35:06 +0300
commitbaebf65595da05b59f09d476cd49dcfb6c0c7dc0 (patch)
tree2097a935c3c3d102eda82518451b5f2e05c1c3c9
parentbb7d110ebf05cdff0b12eeed9a3edc9ebec103b4 (diff)
move out slow tests to separate file (#25824)
-rw-r--r--.github/config/muted_ya.txt4
-rw-r--r--ydb/core/kqp/ut/close_with_load/kqp_cwl.cpp111
-rw-r--r--ydb/core/kqp/ut/close_with_load/kqp_cwl_qs.cpp123
-rw-r--r--ydb/core/kqp/ut/close_with_load/ya.make33
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp92
-rw-r--r--ydb/core/kqp/ut/service/kqp_service_ut.cpp90
-rw-r--r--ydb/core/kqp/ut/ya.make1
7 files changed, 270 insertions, 184 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index 731fd845928..605f6e441e6 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -56,10 +56,10 @@ ydb/core/kqp/ut/scan KqpScan.StreamExecuteScanQueryClientTimeoutBruteForce
ydb/core/kqp/ut/scheme KqpAcl.AclTemporary-IsOlap+UseAdmin
ydb/core/kqp/ut/scheme KqpAcl.AlterDatabasePrivilegesRequiredToChangeSchemeLimits-AsClusterAdmin
ydb/core/kqp/ut/scheme KqpOlapScheme.AddPgColumnWithStore
-ydb/core/kqp/ut/service KqpQueryService.CloseSessionsWithLoad
+ydb/core/kqp/ut/close_with_load KqpQueryService.CloseSessionsWithLoad
ydb/core/kqp/ut/service KqpQueryService.LargeUpsert-UseSink
ydb/core/kqp/ut/service KqpQueryServiceScripts.ExecuteScriptWithCancelAfterAndTimeout
-ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
+ydb/core/kqp/ut/close_with_load KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service KqpService.TwoNodeOneShuttingDown
ydb/core/kqp/ut/service unittest.[*/*] chunk
ydb/core/kqp/ut/yql KqpScripting.StreamExecuteYqlScriptScanOperationTmeoutBruteForce
diff --git a/ydb/core/kqp/ut/close_with_load/kqp_cwl.cpp b/ydb/core/kqp/ut/close_with_load/kqp_cwl.cpp
new file mode 100644
index 00000000000..40c8941f75b
--- /dev/null
+++ b/ydb/core/kqp/ut/close_with_load/kqp_cwl.cpp
@@ -0,0 +1,111 @@
+#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/kqp/common/shutdown/state.h>
+#include <ydb/core/kqp/common/events/events.h>
+#include <ydb/core/kqp/common/shutdown/controller.h>
+#include <ydb/core/kqp/node_service/kqp_node_service.h>
+#include <ydb/core/base/counters.h>
+
+#include <library/cpp/threading/local_executor/local_executor.h>
+#include <ydb/core/tx/datashard/datashard_failpoints.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+Y_UNIT_TEST_SUITE(KqpService) {
+
+ Y_UNIT_TEST(CloseSessionsWithLoad) {
+ auto kikimr = std::make_shared<TKikimrRunner>();
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG);
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG);
+
+ auto db = kikimr->GetTableClient();
+
+ const ui32 SessionsCount = 50;
+ const TDuration WaitDuration = TDuration::Seconds(1);
+
+ TVector<TSession> sessions;
+ for (ui32 i = 0; i < SessionsCount; ++i) {
+ auto sessionResult = db.CreateSession().GetValueSync();
+ UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());
+
+ sessions.push_back(sessionResult.GetSession());
+ }
+
+ NPar::LocalExecutor().RunAdditionalThreads(SessionsCount + 1);
+ NPar::LocalExecutor().ExecRange([&kikimr, sessions, WaitDuration](int id) mutable {
+ if (id == (i32)sessions.size()) {
+ Sleep(WaitDuration);
+ Cerr << "start sessions close....." << Endl;
+ for (ui32 i = 0; i < sessions.size(); ++i) {
+ sessions[i].Close();
+ }
+
+ Cerr << "finished sessions close....." << Endl;
+ auto counters = GetServiceCounters(kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters, "ydb");
+
+ ui64 pendingCompilations = 0;
+ do {
+ Sleep(WaitDuration);
+ pendingCompilations = counters->GetNamedCounter("name", "table.query.compilation.active_count", false)->Val();
+ Cerr << "still compiling... " << pendingCompilations << Endl;
+ } while (pendingCompilations != 0);
+
+ ui64 pendingSessions = 0;
+ do {
+ Sleep(WaitDuration);
+ pendingSessions = counters->GetNamedCounter("name", "table.session.active_count", false)->Val();
+ Cerr << "still active sessions ... " << pendingSessions << Endl;
+ } while (pendingSessions != 0);
+
+ Sleep(TDuration::Seconds(5));
+
+ return;
+ }
+
+ auto session = sessions[id];
+ std::optional<TTransaction> tx;
+
+ while (true) {
+ if (tx) {
+ auto result = tx->Commit().GetValueSync();
+ if (!result.IsSuccess()) {
+ return;
+ }
+
+ tx = {};
+ continue;
+ }
+
+ auto query = Sprintf(R"(
+ SELECT Key, Text, Data FROM `/Root/EightShard` WHERE Key=%1$d + 0;
+ SELECT Key, Data, Text FROM `/Root/EightShard` WHERE Key=%1$d + 1;
+ SELECT Text, Key, Data FROM `/Root/EightShard` WHERE Key=%1$d + 2;
+ SELECT Text, Data, Key FROM `/Root/EightShard` WHERE Key=%1$d + 3;
+ SELECT Data, Key, Text FROM `/Root/EightShard` WHERE Key=%1$d + 4;
+ SELECT Data, Text, Key FROM `/Root/EightShard` WHERE Key=%1$d + 5;
+
+ UPSERT INTO `/Root/EightShard` (Key, Text) VALUES
+ (%2$dul, "New");
+ )", RandomNumber<ui32>(), RandomNumber<ui32>());
+
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx()).GetValueSync();
+ if (!result.IsSuccess()) {
+ Sleep(TDuration::Seconds(5));
+ Cerr << "received non-success status for session " << id << Endl;
+ return;
+ }
+
+ tx = result.GetTransaction();
+ }
+ }, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
+ WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard");
+ }
+}
+}
+} \ No newline at end of file
diff --git a/ydb/core/kqp/ut/close_with_load/kqp_cwl_qs.cpp b/ydb/core/kqp/ut/close_with_load/kqp_cwl_qs.cpp
new file mode 100644
index 00000000000..a281642a794
--- /dev/null
+++ b/ydb/core/kqp/ut/close_with_load/kqp_cwl_qs.cpp
@@ -0,0 +1,123 @@
+#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/kqp/ut/common/columnshard.h>
+#include <ydb/core/testlib/common_helper.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
+#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/operation/operation.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/exceptions/exceptions.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/operation/operation.h>
+
+#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/base/counters.h>
+#include <library/cpp/threading/local_executor/local_executor.h>
+
+#include <fmt/format.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+using namespace NYdb;
+using namespace NYdb::NQuery;
+using namespace fmt::literals;
+
+Y_UNIT_TEST_SUITE(KqpQueryService) {
+
+ // Copy paste from table service but with some modifications for query service
+ // Checks read iterators/session/sdk counters have expected values
+ Y_UNIT_TEST(CloseSessionsWithLoad) {
+ auto kikimr = std::make_shared<TKikimrRunner>();
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG);
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG);
+
+ NYdb::NQuery::TQueryClient db = kikimr->GetQueryClient();
+
+ const ui32 SessionsCount = 50;
+ const TDuration WaitDuration = TDuration::Seconds(1);
+
+ TVector<NYdb::NQuery::TQueryClient::TSession> sessions;
+ for (ui32 i = 0; i < SessionsCount; ++i) {
+ auto sessionResult = db.GetSession().GetValueSync();
+ UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());
+
+ sessions.push_back(sessionResult.GetSession());
+ }
+
+ NPar::LocalExecutor().RunAdditionalThreads(SessionsCount + 1);
+ NPar::LocalExecutor().ExecRange([&kikimr, sessions, WaitDuration](int id) mutable {
+ if (id == (i32)sessions.size()) {
+ Sleep(WaitDuration);
+ Cerr << "start sessions close....." << Endl;
+ auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr->GetEndpoint());
+ for (ui32 i = 0; i < sessions.size(); ++i) {
+ bool allDoneOk = true;
+ NTestHelpers::CheckDelete(clientConfig, TString{sessions[i].GetId()}, Ydb::StatusIds::SUCCESS, allDoneOk);
+ UNIT_ASSERT(allDoneOk);
+ }
+
+ Cerr << "finished sessions close....." << Endl;
+ auto counters = GetServiceCounters(kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters, "ydb");
+
+ ui64 pendingCompilations = 0;
+ do {
+ Sleep(WaitDuration);
+ pendingCompilations = counters->GetNamedCounter("name", "table.query.compilation.active_count", false)->Val();
+ Cerr << "still compiling... " << pendingCompilations << Endl;
+ } while (pendingCompilations != 0);
+
+ ui64 pendingSessions = 0;
+ do {
+ Sleep(WaitDuration);
+ pendingSessions = counters->GetNamedCounter("name", "table.session.active_count", false)->Val();
+ Cerr << "still active sessions ... " << pendingSessions << Endl;
+ } while (pendingSessions != 0);
+
+ return;
+ }
+
+ auto session = sessions[id];
+ std::optional<TTransaction> tx;
+
+ while (true) {
+ if (tx) {
+ auto result = tx->Commit().GetValueSync();
+ if (!result.IsSuccess()) {
+ return;
+ }
+
+ tx = {};
+ continue;
+ }
+
+ auto query = Sprintf(R"(
+ SELECT Key, Text, Data FROM `/Root/EightShard` WHERE Key=%1$d + 0;
+ SELECT Key, Data, Text FROM `/Root/EightShard` WHERE Key=%1$d + 1;
+ SELECT Text, Key, Data FROM `/Root/EightShard` WHERE Key=%1$d + 2;
+ SELECT Text, Data, Key FROM `/Root/EightShard` WHERE Key=%1$d + 3;
+ SELECT Data, Key, Text FROM `/Root/EightShard` WHERE Key=%1$d + 4;
+ SELECT Data, Text, Key FROM `/Root/EightShard` WHERE Key=%1$d + 5;
+
+ UPSERT INTO `/Root/EightShard` (Key, Text) VALUES
+ (%2$dul, "New");
+ )", RandomNumber<ui32>(), RandomNumber<ui32>());
+
+ auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).GetValueSync();
+ if (!result.IsSuccess()) {
+ UNIT_ASSERT_C(IsIn({EStatus::BAD_SESSION, EStatus::CANCELLED}, result.GetStatus()), result.GetIssues().ToString());
+ Cerr << "received non-success status for session " << id << Endl;
+ return;
+ }
+
+ tx = result.GetTransaction();
+ }
+ }, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
+ WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard");
+ }
+}
+}
+} \ No newline at end of file
diff --git a/ydb/core/kqp/ut/close_with_load/ya.make b/ydb/core/kqp/ut/close_with_load/ya.make
new file mode 100644
index 00000000000..b059ec5ad84
--- /dev/null
+++ b/ydb/core/kqp/ut/close_with_load/ya.make
@@ -0,0 +1,33 @@
+UNITTEST_FOR(ydb/core/kqp)
+
+FORK_SUBTESTS()
+SPLIT_FACTOR(50)
+
+IF (WITH_VALGRIND)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ SIZE(MEDIUM)
+ENDIF()
+
+SRCS(
+ kqp_cwl.cpp
+ kqp_cwl_qs.cpp
+)
+
+PEERDIR(
+ contrib/libs/fmt
+ library/cpp/threading/local_executor
+ ydb/core/kqp
+ ydb/core/kqp/ut/common
+ ydb/core/tx/columnshard/hooks/testing
+ yql/essentials/sql/pg
+ yql/essentials/parser/pg_wrapper
+ ydb/public/lib/ut_helpers
+ ydb/public/sdk/cpp/src/client/operation
+ ydb/public/sdk/cpp/src/client/types/operation
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index ef05bcba8f0..5a16c8057b9 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -113,98 +113,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
DoClosedSessionRemovedWhileActiveTest(false);
}
*/
- // Copy paste from table service but with some modifications for query service
- // Checks read iterators/session/sdk counters have expected values
- Y_UNIT_TEST(CloseSessionsWithLoad) {
- auto kikimr = std::make_shared<TKikimrRunner>();
- kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
- kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);
- kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG);
- kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG);
-
- NYdb::NQuery::TQueryClient db = kikimr->GetQueryClient();
-
- const ui32 SessionsCount = 50;
- const TDuration WaitDuration = TDuration::Seconds(1);
-
- TVector<NYdb::NQuery::TQueryClient::TSession> sessions;
- for (ui32 i = 0; i < SessionsCount; ++i) {
- auto sessionResult = db.GetSession().GetValueSync();
- UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());
-
- sessions.push_back(sessionResult.GetSession());
- }
-
- NPar::LocalExecutor().RunAdditionalThreads(SessionsCount + 1);
- NPar::LocalExecutor().ExecRange([&kikimr, sessions, WaitDuration](int id) mutable {
- if (id == (i32)sessions.size()) {
- Sleep(WaitDuration);
- Cerr << "start sessions close....." << Endl;
- auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr->GetEndpoint());
- for (ui32 i = 0; i < sessions.size(); ++i) {
- bool allDoneOk = true;
- NTestHelpers::CheckDelete(clientConfig, TString{sessions[i].GetId()}, Ydb::StatusIds::SUCCESS, allDoneOk);
- UNIT_ASSERT(allDoneOk);
- }
-
- Cerr << "finished sessions close....." << Endl;
- auto counters = GetServiceCounters(kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters, "ydb");
-
- ui64 pendingCompilations = 0;
- do {
- Sleep(WaitDuration);
- pendingCompilations = counters->GetNamedCounter("name", "table.query.compilation.active_count", false)->Val();
- Cerr << "still compiling... " << pendingCompilations << Endl;
- } while (pendingCompilations != 0);
-
- ui64 pendingSessions = 0;
- do {
- Sleep(WaitDuration);
- pendingSessions = counters->GetNamedCounter("name", "table.session.active_count", false)->Val();
- Cerr << "still active sessions ... " << pendingSessions << Endl;
- } while (pendingSessions != 0);
-
- return;
- }
-
- auto session = sessions[id];
- std::optional<TTransaction> tx;
-
- while (true) {
- if (tx) {
- auto result = tx->Commit().GetValueSync();
- if (!result.IsSuccess()) {
- return;
- }
-
- tx = {};
- continue;
- }
-
- auto query = Sprintf(R"(
- SELECT Key, Text, Data FROM `/Root/EightShard` WHERE Key=%1$d + 0;
- SELECT Key, Data, Text FROM `/Root/EightShard` WHERE Key=%1$d + 1;
- SELECT Text, Key, Data FROM `/Root/EightShard` WHERE Key=%1$d + 2;
- SELECT Text, Data, Key FROM `/Root/EightShard` WHERE Key=%1$d + 3;
- SELECT Data, Key, Text FROM `/Root/EightShard` WHERE Key=%1$d + 4;
- SELECT Data, Text, Key FROM `/Root/EightShard` WHERE Key=%1$d + 5;
-
- UPSERT INTO `/Root/EightShard` (Key, Text) VALUES
- (%2$dul, "New");
- )", RandomNumber<ui32>(), RandomNumber<ui32>());
-
- auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).GetValueSync();
- if (!result.IsSuccess()) {
- UNIT_ASSERT_C(IsIn({EStatus::BAD_SESSION, EStatus::CANCELLED}, result.GetStatus()), result.GetIssues().ToString());
- Cerr << "received non-success status for session " << id << Endl;
- return;
- }
-
- tx = result.GetTransaction();
- }
- }, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
- WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard");
- }
Y_UNIT_TEST(PeriodicTaskInSessionPool) {
auto kikimr = DefaultKikimrRunner();
diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
index b39553996be..d82ac48c10e 100644
--- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
@@ -130,96 +130,6 @@ Y_UNIT_TEST_SUITE(KqpService) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::CANCELLED, result.GetIssues().ToString());
}
- Y_UNIT_TEST(CloseSessionsWithLoad) {
- auto kikimr = std::make_shared<TKikimrRunner>();
- kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
- kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);
- kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG);
- kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG);
-
- auto db = kikimr->GetTableClient();
-
- const ui32 SessionsCount = 50;
- const TDuration WaitDuration = TDuration::Seconds(1);
-
- TVector<TSession> sessions;
- for (ui32 i = 0; i < SessionsCount; ++i) {
- auto sessionResult = db.CreateSession().GetValueSync();
- UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());
-
- sessions.push_back(sessionResult.GetSession());
- }
-
- NPar::LocalExecutor().RunAdditionalThreads(SessionsCount + 1);
- NPar::LocalExecutor().ExecRange([&kikimr, sessions, WaitDuration](int id) mutable {
- if (id == (i32)sessions.size()) {
- Sleep(WaitDuration);
- Cerr << "start sessions close....." << Endl;
- for (ui32 i = 0; i < sessions.size(); ++i) {
- sessions[i].Close();
- }
-
- Cerr << "finished sessions close....." << Endl;
- auto counters = GetServiceCounters(kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters, "ydb");
-
- ui64 pendingCompilations = 0;
- do {
- Sleep(WaitDuration);
- pendingCompilations = counters->GetNamedCounter("name", "table.query.compilation.active_count", false)->Val();
- Cerr << "still compiling... " << pendingCompilations << Endl;
- } while (pendingCompilations != 0);
-
- ui64 pendingSessions = 0;
- do {
- Sleep(WaitDuration);
- pendingSessions = counters->GetNamedCounter("name", "table.session.active_count", false)->Val();
- Cerr << "still active sessions ... " << pendingSessions << Endl;
- } while (pendingSessions != 0);
-
- Sleep(TDuration::Seconds(5));
-
- return;
- }
-
- auto session = sessions[id];
- std::optional<TTransaction> tx;
-
- while (true) {
- if (tx) {
- auto result = tx->Commit().GetValueSync();
- if (!result.IsSuccess()) {
- return;
- }
-
- tx = {};
- continue;
- }
-
- auto query = Sprintf(R"(
- SELECT Key, Text, Data FROM `/Root/EightShard` WHERE Key=%1$d + 0;
- SELECT Key, Data, Text FROM `/Root/EightShard` WHERE Key=%1$d + 1;
- SELECT Text, Key, Data FROM `/Root/EightShard` WHERE Key=%1$d + 2;
- SELECT Text, Data, Key FROM `/Root/EightShard` WHERE Key=%1$d + 3;
- SELECT Data, Key, Text FROM `/Root/EightShard` WHERE Key=%1$d + 4;
- SELECT Data, Text, Key FROM `/Root/EightShard` WHERE Key=%1$d + 5;
-
- UPSERT INTO `/Root/EightShard` (Key, Text) VALUES
- (%2$dul, "New");
- )", RandomNumber<ui32>(), RandomNumber<ui32>());
-
- auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx()).GetValueSync();
- if (!result.IsSuccess()) {
- Sleep(TDuration::Seconds(5));
- Cerr << "received non-success status for session " << id << Endl;
- return;
- }
-
- tx = result.GetTransaction();
- }
- }, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
- WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard");
- }
-
TVector<TAsyncDataQueryResult> simulateSessionBusy(ui32 count, TSession& session) {
TVector<TAsyncDataQueryResult> futures;
for (ui32 i = 0; i < count; ++i) {
diff --git a/ydb/core/kqp/ut/ya.make b/ydb/core/kqp/ut/ya.make
index e3b86478f44..d8b85f8b7f8 100644
--- a/ydb/core/kqp/ut/ya.make
+++ b/ydb/core/kqp/ut/ya.make
@@ -1,6 +1,7 @@
RECURSE_FOR_TESTS(
arrow
batch_operations
+ close_with_load
cost
data
data_integrity