aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2024-12-31 16:33:46 +0100
committerGitHub <noreply@github.com>2024-12-31 16:33:46 +0100
commitec34bf1931395cd79abe9b82777954413ba8ce75 (patch)
tree876de3f7d17ceefe00e19f6d1eb8740a8216e16f
parentacaff68f553828b0b8f93eefc71d89dcd7848e1f (diff)
downloadydb-ec34bf1931395cd79abe9b82777954413ba8ce75.tar.gz
Add check for zero read iterators count after cancelation. (#13135)
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp38
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h1
-rw-r--r--ydb/core/kqp/ut/query/kqp_limits_ut.cpp6
-rw-r--r--ydb/core/kqp/ut/scan/kqp_scan_ut.cpp1
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp1
-rw-r--r--ydb/core/kqp/ut/service/kqp_service_ut.cpp1
-rw-r--r--ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp5
7 files changed, 53 insertions, 0 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index ed075e40b1..45601cb9d4 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -1393,6 +1393,44 @@ void WaitForZeroSessions(const NKqp::TKqpCounters& counters) {
UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
}
+void WaitForZeroReadIterators(Tests::TServer& server, const TString& path) {
+ int iterators = 0;
+ static const TString counterName = "DataShard/ReadIteratorsCount";
+
+ for (int i = 0; i < 10; i++, Sleep(TDuration::Seconds(1))) {
+ TTestActorRuntime* runtime = server.GetRuntime();
+ auto sender = runtime->AllocateEdgeActor();
+ auto shards = GetTableShards(&server, sender, path);
+ UNIT_ASSERT_C(shards.size() > 0, "Table: " << path << " has no shards");
+ iterators = 0;
+ for (auto x : shards) {
+ runtime->SendToPipe(
+ x,
+ sender,
+ new TEvTablet::TEvGetCounters,
+ 0,
+ GetPipeConfigWithRetries());
+
+ auto ev = runtime->GrabEdgeEvent<TEvTablet::TEvGetCountersResponse>(sender);
+ UNIT_ASSERT(ev);
+
+ const NKikimrTabletBase::TEvGetCountersResponse& resp = ev->Get()->Record;
+ for (const auto& counter : resp.GetTabletCounters().GetAppCounters().GetSimpleCounters()) {
+ if (counterName != counter.GetName()) {
+ continue;
+ }
+
+ iterators += counter.GetValue();
+ }
+ }
+ if (iterators == 0) {
+ break;
+ }
+ }
+
+ UNIT_ASSERT_C(iterators == 0, "Unable to wait for proper read iterator count, it looks like cancelation doesn`t work (" << iterators << ")");
+}
+
NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& params) {
if (auto ops = opt.GetMapSafe().find("Operators"); ops != opt.GetMapSafe().end()) {
auto opName = ops->second.GetArraySafe()[0].GetMapSafe().at("Name").GetStringSafe();
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 5352993653..5def5fbe85 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -372,6 +372,7 @@ TVector<ui64> GetTableShards(Tests::TServer::TPtr server, TActorId sender, const
TVector<ui64> GetColumnTableShards(Tests::TServer* server, TActorId sender, const TString &path);
void WaitForZeroSessions(const NKqp::TKqpCounters& counters);
+void WaitForZeroReadIterators(Tests::TServer& server, const TString& path);
bool JoinOrderAndAlgosMatch(const TString& optimized, const TString& reference);
diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
index bca856f0cf..27447ff6e6 100644
--- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
@@ -759,6 +759,7 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}
WaitForZeroSessions(counters);
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}
void DoCancelAfterRo(bool follower, bool streamLookup, bool dependedRead) {
@@ -869,6 +870,11 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
UNIT_ASSERT(wasCanceled);
}
WaitForZeroSessions(counters);
+
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
+ if (follower) {
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/OneShardWithFolower");
+ }
}
Y_UNIT_TEST(CancelAfterRoTx) {
diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
index b14c1268a9..db5dbc5239 100644
--- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
@@ -168,6 +168,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
WaitForZeroSessions(counters);
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}
Y_UNIT_TEST(IsNull) {
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index e28ce4b4a8..db0476149f 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -2351,6 +2351,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
WaitForZeroSessions(counters);
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
for (const auto& service: kikimr.GetTestServer().GetGRpcServer().GetServices()) {
UNIT_ASSERT_VALUES_EQUAL(service->RequestsInProgress(), 0);
diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
index 18333d025b..4882a41284 100644
--- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
@@ -153,6 +153,7 @@ Y_UNIT_TEST_SUITE(KqpService) {
tx = result.GetTransaction();
}
}, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
+ WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard");
}
TVector<TAsyncDataQueryResult> simulateSessionBusy(ui32 count, TSession& session) {
diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
index 4876dd438a..7c5399a8b4 100644
--- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
@@ -732,6 +732,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}
UNIT_ASSERT(unsuccessStatus);
WaitForZeroSessions(counters);
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}
void DoStreamExecuteYqlScriptTimeoutBruteForce(bool clientTimeout, bool operationTimeout) {
@@ -774,6 +775,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}
WaitForZeroSessions(counters);
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}
Y_UNIT_TEST(StreamExecuteYqlScriptScanCancelAfterBruteForce) {
@@ -816,6 +818,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}
WaitForZeroSessions(counters);
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}
// Check in case of CANCELED status we have no made changes in the table
@@ -866,6 +869,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}
WaitForZeroSessions(counters);
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}
Y_UNIT_TEST(StreamExecuteYqlScriptWriteCancelAfterBruteForced) {
@@ -905,6 +909,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}
WaitForZeroSessions(counters);
+ WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}
Y_UNIT_TEST(StreamExecuteYqlScriptScanClientTimeoutBruteForce) {