diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-04-26 14:31:05 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-04-26 14:31:05 +0300 |
commit | 266c0d7abc7bdc48c04254ad716b004a8776d8ce (patch) | |
tree | f431e6be310637a599f25d4a02446a84b1e9d475 | |
parent | 515d9f94063f8a5ca9a676ea37562c595bf16a48 (diff) | |
download | ydb-266c0d7abc7bdc48c04254ad716b004a8776d8ce.tar.gz |
Brute force timeouts tests to check proper cancelation.
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 14 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 37 | ||||
-rw-r--r-- | ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp | 47 |
4 files changed, 113 insertions, 8 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 8bbf632565..7b0a7a1614 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -580,8 +580,12 @@ void PrintResultSet(const NYdb::TResultSet& resultSet, NYson::TYsonWriter& write } } +bool IsTimeoutError(NYdb::EStatus status) { + return status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED || status == NYdb::EStatus::TIMEOUT; +} + template<typename TIterator> -TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles) { +TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool throwOnTimeout = false) { TStringStream out; NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true); writer.OnBeginList(); @@ -591,6 +595,9 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles) { for (;;) { auto streamPart = it.ReadNext().GetValueSync(); if (!streamPart.IsSuccess()) { + if (throwOnTimeout && IsTimeoutError(streamPart.GetStatus())) { + throw TStreamReadError(streamPart.GetStatus()); + } UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); break; } @@ -609,11 +616,11 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles) { return out.Str(); } -TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it) { - return StreamResultToYsonImpl(it, nullptr); +TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTimeout) { + return StreamResultToYsonImpl(it, nullptr, throwOnTimeout); } -TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it) { +TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTimeout) { TStringStream out; NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true); writer.OnBeginList(); @@ -623,6 +630,9 @@ TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it) { for (;;) { auto streamPart = it.ReadNext().GetValueSync(); if (!streamPart.IsSuccess()) { + if (throwOnTimeout && IsTimeoutError(streamPart.GetStatus())) { + throw TStreamReadError(streamPart.GetStatus()); + } UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); break; } @@ -638,7 +648,7 @@ TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it) { return out.Str(); } -TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it) { +TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool throwOnTimeout) { TStringStream out; NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true); writer.OnBeginList(); @@ -650,6 +660,9 @@ TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it) { for (;;) { auto streamPart = it.ReadNext().GetValueSync(); if (!streamPart.IsSuccess()) { + if (throwOnTimeout && IsTimeoutError(streamPart.GetStatus())) { + throw TStreamReadError(streamPart.GetStatus()); + } UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); break; } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 2bc1b5f6d7..b1793deb47 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -235,9 +235,17 @@ inline NYdb::NTable::TDataQueryResult ExecQueryAndTestResult(NYdb::NTable::TSess return ExecQueryAndTestResult(session, query, NYdb::TParamsBuilder().Build(), expectedYson); } -TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it); -TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it); -TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it); +class TStreamReadError : public yexception { +public: + TStreamReadError(NYdb::EStatus status) + : Status(status) + {} + NYdb::EStatus Status; +}; + +TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTImeout = false); +TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool throwOnTImeout = false); +TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTImeout = false); ui32 CountPlanNodesByKv(const NJson::TJsonValue& plan, const TString& key, const TString& value); NJson::TJsonValue FindPlanNodeByKv(const NJson::TJsonValue& plan, const TString& key, const TString& value); diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index 41959f5bcd..f0612e4aa2 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -128,6 +128,43 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); } + Y_UNIT_TEST(StreamExecuteScanQueryTimeoutBruteForce) { + TKikimrRunner kikimr; + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + int maxTimeoutMs = 100; + + for (int i = 1; i < maxTimeoutMs; i++) { + auto it = kikimr.GetTableClient().StreamExecuteScanQuery(R"( + SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key; + )", + TStreamExecScanQuerySettings() + .ClientTimeout(TDuration::MilliSeconds(i)) + ).GetValueSync(); + + if (it.IsSuccess()) { + try { + auto yson = StreamResultToYson(it, true); + CompareYson(R"([[[1];[101u];["Value1"]];[[2];[201u];["Value1"]];[[3];[301u];["Value1"]];[[1];[401u];["Value1"]];[[2];[501u];["Value1"]];[[3];[601u];["Value1"]];[[1];[701u];["Value1"]];[[2];[801u];["Value1"]]])", yson); + } catch (const TStreamReadError& ex) { + UNIT_ASSERT_VALUES_EQUAL(ex.Status, NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED); + } catch (const std::exception& ex) { + UNIT_ASSERT_C(false, "unknown exception during the test"); + } + } else { + UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED); + } + } + + int count = 60; + while (counters.GetActiveSessionActors()->Val() != 0 && count) { + count--; + Sleep(TDuration::Seconds(1)); + } + + UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); + } + Y_UNIT_TEST(IsNull) { auto kikimr = DefaultKikimrRunner(); CreateNullSampleTables(kikimr); diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp index 5474d8649b..3e4488a6ad 100644 --- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp @@ -563,6 +563,53 @@ Y_UNIT_TEST_SUITE(KqpScripting) { UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); } + Y_UNIT_TEST(StreamExecuteYqlScriptScanTimeoutBruteForce) { + TKikimrRunner kikimr; + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + TScriptingClient client(kikimr.GetDriver()); + + int maxTimeoutMs = 1000; + + for (int i = 1; i < maxTimeoutMs; i++) { + auto it = client.StreamExecuteYqlScript(R"( + SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key; + )", + TExecuteYqlRequestSettings() + .ClientTimeout(TDuration::MilliSeconds(i)) + ).GetValueSync(); + + if (it.IsSuccess()) { + try { + auto yson = StreamResultToYson(it, true); + CompareYson(R"([[[[1];[101u];["Value1"]];[[2];[201u];["Value1"]];[[3];[301u];["Value1"]];[[1];[401u];["Value1"]];[[2];[501u];["Value1"]];[[3];[601u];["Value1"]];[[1];[701u];["Value1"]];[[2];[801u];["Value1"]]]])", yson); + } catch (const TStreamReadError& ex) { + if (ex.Status != NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED && ex.Status != NYdb::EStatus::TIMEOUT) { + TStringStream msg; + msg << "unexpected status: " << ex.Status; + UNIT_ASSERT_C(false, msg.Str().data()); + } + } catch (const std::exception& ex) { + auto msg = TString("unknown exception during the test: ") + ex.what(); + UNIT_ASSERT_C(false, msg.data()); + } + } else { + UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED); + } + } +// We unable to close worker actor on timeout right now +#if 0 + int count = 60; + while (counters.GetActiveSessionActors()->Val() != 0 && count) { + Cerr << "SESSIONS: " << counters.GetActiveSessionActors()->Val() << Endl; + count--; + Sleep(TDuration::Seconds(1)); + } + + UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); +#endif + } + Y_UNIT_TEST(StreamExecuteYqlScriptScanScalar) { TKikimrRunner kikimr; TScriptingClient client(kikimr.GetDriver()); |