aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-04-26 14:31:05 +0300
committerdcherednik <dcherednik@ydb.tech>2023-04-26 14:31:05 +0300
commit266c0d7abc7bdc48c04254ad716b004a8776d8ce (patch)
treef431e6be310637a599f25d4a02446a84b1e9d475
parent515d9f94063f8a5ca9a676ea37562c595bf16a48 (diff)
downloadydb-266c0d7abc7bdc48c04254ad716b004a8776d8ce.tar.gz
Brute force timeouts tests to check proper cancelation.
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp23
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h14
-rw-r--r--ydb/core/kqp/ut/scan/kqp_scan_ut.cpp37
-rw-r--r--ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp47
4 files changed, 113 insertions, 8 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 8bbf632565..7b0a7a1614 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -580,8 +580,12 @@ void PrintResultSet(const NYdb::TResultSet& resultSet, NYson::TYsonWriter& write
}
}
+bool IsTimeoutError(NYdb::EStatus status) {
+ return status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED || status == NYdb::EStatus::TIMEOUT;
+}
+
template<typename TIterator>
-TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles) {
+TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool throwOnTimeout = false) {
TStringStream out;
NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true);
writer.OnBeginList();
@@ -591,6 +595,9 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles) {
for (;;) {
auto streamPart = it.ReadNext().GetValueSync();
if (!streamPart.IsSuccess()) {
+ if (throwOnTimeout && IsTimeoutError(streamPart.GetStatus())) {
+ throw TStreamReadError(streamPart.GetStatus());
+ }
UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
break;
}
@@ -609,11 +616,11 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles) {
return out.Str();
}
-TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it) {
- return StreamResultToYsonImpl(it, nullptr);
+TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTimeout) {
+ return StreamResultToYsonImpl(it, nullptr, throwOnTimeout);
}
-TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it) {
+TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTimeout) {
TStringStream out;
NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true);
writer.OnBeginList();
@@ -623,6 +630,9 @@ TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it) {
for (;;) {
auto streamPart = it.ReadNext().GetValueSync();
if (!streamPart.IsSuccess()) {
+ if (throwOnTimeout && IsTimeoutError(streamPart.GetStatus())) {
+ throw TStreamReadError(streamPart.GetStatus());
+ }
UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
break;
}
@@ -638,7 +648,7 @@ TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it) {
return out.Str();
}
-TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it) {
+TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool throwOnTimeout) {
TStringStream out;
NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true);
writer.OnBeginList();
@@ -650,6 +660,9 @@ TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it) {
for (;;) {
auto streamPart = it.ReadNext().GetValueSync();
if (!streamPart.IsSuccess()) {
+ if (throwOnTimeout && IsTimeoutError(streamPart.GetStatus())) {
+ throw TStreamReadError(streamPart.GetStatus());
+ }
UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
break;
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 2bc1b5f6d7..b1793deb47 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -235,9 +235,17 @@ inline NYdb::NTable::TDataQueryResult ExecQueryAndTestResult(NYdb::NTable::TSess
return ExecQueryAndTestResult(session, query, NYdb::TParamsBuilder().Build(), expectedYson);
}
-TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it);
-TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it);
-TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it);
+class TStreamReadError : public yexception {
+public:
+ TStreamReadError(NYdb::EStatus status)
+ : Status(status)
+ {}
+ NYdb::EStatus Status;
+};
+
+TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTImeout = false);
+TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool throwOnTImeout = false);
+TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTImeout = false);
ui32 CountPlanNodesByKv(const NJson::TJsonValue& plan, const TString& key, const TString& value);
NJson::TJsonValue FindPlanNodeByKv(const NJson::TJsonValue& plan, const TString& key, const TString& value);
diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
index 41959f5bcd..f0612e4aa2 100644
--- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
@@ -128,6 +128,43 @@ Y_UNIT_TEST_SUITE(KqpScan) {
UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
}
+ Y_UNIT_TEST(StreamExecuteScanQueryTimeoutBruteForce) {
+ TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ int maxTimeoutMs = 100;
+
+ for (int i = 1; i < maxTimeoutMs; i++) {
+ auto it = kikimr.GetTableClient().StreamExecuteScanQuery(R"(
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
+ )",
+ TStreamExecScanQuerySettings()
+ .ClientTimeout(TDuration::MilliSeconds(i))
+ ).GetValueSync();
+
+ if (it.IsSuccess()) {
+ try {
+ auto yson = StreamResultToYson(it, true);
+ CompareYson(R"([[[1];[101u];["Value1"]];[[2];[201u];["Value1"]];[[3];[301u];["Value1"]];[[1];[401u];["Value1"]];[[2];[501u];["Value1"]];[[3];[601u];["Value1"]];[[1];[701u];["Value1"]];[[2];[801u];["Value1"]]])", yson);
+ } catch (const TStreamReadError& ex) {
+ UNIT_ASSERT_VALUES_EQUAL(ex.Status, NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED);
+ } catch (const std::exception& ex) {
+ UNIT_ASSERT_C(false, "unknown exception during the test");
+ }
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED);
+ }
+ }
+
+ int count = 60;
+ while (counters.GetActiveSessionActors()->Val() != 0 && count) {
+ count--;
+ Sleep(TDuration::Seconds(1));
+ }
+
+ UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
+ }
+
Y_UNIT_TEST(IsNull) {
auto kikimr = DefaultKikimrRunner();
CreateNullSampleTables(kikimr);
diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
index 5474d8649b..3e4488a6ad 100644
--- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
@@ -563,6 +563,53 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
}
+ Y_UNIT_TEST(StreamExecuteYqlScriptScanTimeoutBruteForce) {
+ TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ TScriptingClient client(kikimr.GetDriver());
+
+ int maxTimeoutMs = 1000;
+
+ for (int i = 1; i < maxTimeoutMs; i++) {
+ auto it = client.StreamExecuteYqlScript(R"(
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
+ )",
+ TExecuteYqlRequestSettings()
+ .ClientTimeout(TDuration::MilliSeconds(i))
+ ).GetValueSync();
+
+ if (it.IsSuccess()) {
+ try {
+ auto yson = StreamResultToYson(it, true);
+ CompareYson(R"([[[[1];[101u];["Value1"]];[[2];[201u];["Value1"]];[[3];[301u];["Value1"]];[[1];[401u];["Value1"]];[[2];[501u];["Value1"]];[[3];[601u];["Value1"]];[[1];[701u];["Value1"]];[[2];[801u];["Value1"]]]])", yson);
+ } catch (const TStreamReadError& ex) {
+ if (ex.Status != NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED && ex.Status != NYdb::EStatus::TIMEOUT) {
+ TStringStream msg;
+ msg << "unexpected status: " << ex.Status;
+ UNIT_ASSERT_C(false, msg.Str().data());
+ }
+ } catch (const std::exception& ex) {
+ auto msg = TString("unknown exception during the test: ") + ex.what();
+ UNIT_ASSERT_C(false, msg.data());
+ }
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED);
+ }
+ }
+// We unable to close worker actor on timeout right now
+#if 0
+ int count = 60;
+ while (counters.GetActiveSessionActors()->Val() != 0 && count) {
+ Cerr << "SESSIONS: " << counters.GetActiveSessionActors()->Val() << Endl;
+ count--;
+ Sleep(TDuration::Seconds(1));
+ }
+
+ UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
+#endif
+ }
+
Y_UNIT_TEST(StreamExecuteYqlScriptScanScalar) {
TKikimrRunner kikimr;
TScriptingClient client(kikimr.GetDriver());