diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2024-12-31 16:33:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-31 16:33:46 +0100 |
commit | ec34bf1931395cd79abe9b82777954413ba8ce75 (patch) | |
tree | 876de3f7d17ceefe00e19f6d1eb8740a8216e16f | |
parent | acaff68f553828b0b8f93eefc71d89dcd7848e1f (diff) | |
download | ydb-ec34bf1931395cd79abe9b82777954413ba8ce75.tar.gz |
Add check for zero read iterators count after cancelation. (#13135)
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 38 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_service_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp | 5 |
7 files changed, 53 insertions, 0 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index ed075e40b1..45601cb9d4 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -1393,6 +1393,44 @@ void WaitForZeroSessions(const NKqp::TKqpCounters& counters) { UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); } +void WaitForZeroReadIterators(Tests::TServer& server, const TString& path) { + int iterators = 0; + static const TString counterName = "DataShard/ReadIteratorsCount"; + + for (int i = 0; i < 10; i++, Sleep(TDuration::Seconds(1))) { + TTestActorRuntime* runtime = server.GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + auto shards = GetTableShards(&server, sender, path); + UNIT_ASSERT_C(shards.size() > 0, "Table: " << path << " has no shards"); + iterators = 0; + for (auto x : shards) { + runtime->SendToPipe( + x, + sender, + new TEvTablet::TEvGetCounters, + 0, + GetPipeConfigWithRetries()); + + auto ev = runtime->GrabEdgeEvent<TEvTablet::TEvGetCountersResponse>(sender); + UNIT_ASSERT(ev); + + const NKikimrTabletBase::TEvGetCountersResponse& resp = ev->Get()->Record; + for (const auto& counter : resp.GetTabletCounters().GetAppCounters().GetSimpleCounters()) { + if (counterName != counter.GetName()) { + continue; + } + + iterators += counter.GetValue(); + } + } + if (iterators == 0) { + break; + } + } + + UNIT_ASSERT_C(iterators == 0, "Unable to wait for proper read iterator count, it looks like cancelation doesn`t work (" << iterators << ")"); +} + NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& params) { if (auto ops = opt.GetMapSafe().find("Operators"); ops != opt.GetMapSafe().end()) { auto opName = ops->second.GetArraySafe()[0].GetMapSafe().at("Name").GetStringSafe(); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 5352993653..5def5fbe85 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -372,6 +372,7 @@ TVector<ui64> GetTableShards(Tests::TServer::TPtr server, TActorId sender, const TVector<ui64> GetColumnTableShards(Tests::TServer* server, TActorId sender, const TString &path); void WaitForZeroSessions(const NKqp::TKqpCounters& counters); +void WaitForZeroReadIterators(Tests::TServer& server, const TString& path); bool JoinOrderAndAlgosMatch(const TString& optimized, const TString& reference); diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index bca856f0cf..27447ff6e6 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -759,6 +759,7 @@ Y_UNIT_TEST_SUITE(KqpLimits) { } WaitForZeroSessions(counters); + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard"); } void DoCancelAfterRo(bool follower, bool streamLookup, bool dependedRead) { @@ -869,6 +870,11 @@ Y_UNIT_TEST_SUITE(KqpLimits) { UNIT_ASSERT(wasCanceled); } WaitForZeroSessions(counters); + + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard"); + if (follower) { + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/OneShardWithFolower"); + } } Y_UNIT_TEST(CancelAfterRoTx) { diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index b14c1268a9..db5dbc5239 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -168,6 +168,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { } WaitForZeroSessions(counters); + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard"); } Y_UNIT_TEST(IsNull) { diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index e28ce4b4a8..db0476149f 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2351,6 +2351,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } WaitForZeroSessions(counters); + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard"); for (const auto& service: kikimr.GetTestServer().GetGRpcServer().GetServices()) { UNIT_ASSERT_VALUES_EQUAL(service->RequestsInProgress(), 0); diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp index 18333d025b..4882a41284 100644 --- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp @@ -153,6 +153,7 @@ Y_UNIT_TEST_SUITE(KqpService) { tx = result.GetTransaction(); } }, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard"); } TVector<TAsyncDataQueryResult> simulateSessionBusy(ui32 count, TSession& session) { diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp index 4876dd438a..7c5399a8b4 100644 --- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp @@ -732,6 +732,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) { } UNIT_ASSERT(unsuccessStatus); WaitForZeroSessions(counters); + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard"); } void DoStreamExecuteYqlScriptTimeoutBruteForce(bool clientTimeout, bool operationTimeout) { @@ -774,6 +775,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) { } WaitForZeroSessions(counters); + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard"); } Y_UNIT_TEST(StreamExecuteYqlScriptScanCancelAfterBruteForce) { @@ -816,6 +818,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) { } WaitForZeroSessions(counters); + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard"); } // Check in case of CANCELED status we have no made changes in the table @@ -866,6 +869,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) { } WaitForZeroSessions(counters); + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard"); } Y_UNIT_TEST(StreamExecuteYqlScriptWriteCancelAfterBruteForced) { @@ -905,6 +909,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) { } WaitForZeroSessions(counters); + WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard"); } Y_UNIT_TEST(StreamExecuteYqlScriptScanClientTimeoutBruteForce) { |